You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:32 UTC

[07/50] [abbrv] carbondata git commit: [CARBONDATA-2498] Change CarbonWriterBuilder interface to take schema while creating writer

[CARBONDATA-2498] Change CarbonWriterBuilder interface to take schema while creating writer

This closes #2316


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/26eb2d0b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/26eb2d0b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/26eb2d0b

Branch: refs/heads/spark-2.3
Commit: 26eb2d0b0e795c098d064471f7387072a74e07e5
Parents: 7ef9164
Author: kunal642 <ku...@gmail.com>
Authored: Thu May 17 21:00:50 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon May 21 17:13:12 2018 +0530

----------------------------------------------------------------------
 docs/sdk-writer-guide.md                        |  25 +-
 .../examples/sdk/CarbonReaderExample.java       |   3 +-
 .../carbondata/examples/sdk/SDKS3Example.java   |   3 +-
 .../carbondata/examples/S3UsingSDkExample.scala |   8 +-
 ...FileInputFormatWithExternalCarbonTable.scala |   4 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |   4 +-
 .../TestNonTransactionalCarbonTable.scala       | 302 +++----------------
 ...ransactionalCarbonTableWithComplexType.scala |  53 +---
 ...tSparkCarbonFileFormatWithSparkSession.scala |   4 +-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |   8 +-
 .../sdk/file/CarbonWriterBuilder.java           |  24 +-
 .../sdk/file/AvroCarbonWriterTest.java          |  15 +-
 .../sdk/file/CSVCarbonWriterTest.java           |  10 +-
 .../CSVNonTransactionalCarbonWriterTest.java    |   6 +-
 .../apache/carbondata/sdk/file/TestUtil.java    |   3 +-
 15 files changed, 82 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/docs/sdk-writer-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-writer-guide.md b/docs/sdk-writer-guide.md
index 9878b71..682b27a 100644
--- a/docs/sdk-writer-guide.md
+++ b/docs/sdk-writer-guide.md
@@ -33,9 +33,9 @@ These SDK writer output contains just a carbondata and carbonindex files. No met
  
      Schema schema = new Schema(fields);
  
-     CarbonWriterBuilder builder = CarbonWriter.builder().withSchema(schema).outputPath(path);
+     CarbonWriterBuilder builder = CarbonWriter.builder().outputPath(path);
  
-     CarbonWriter writer = builder.buildWriterForCSVInput();
+     CarbonWriter writer = builder.buildWriterForCSVInput(schema);
  
      int rows = 5;
      for (int i = 0; i < rows; i++) {
@@ -87,15 +87,10 @@ public class TestSdkAvro {
     GenericData.Record record = converter.convertToGenericDataRecord(
         json.getBytes(CharEncoding.UTF_8), new org.apache.avro.Schema.Parser().parse(avroSchema));
 
-    // prepare carbon schema from avro schema 
-    org.apache.carbondata.sdk.file.Schema carbonSchema =
-            AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
-
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(carbonSchema)
           .outputPath(path)
-          .buildWriterForAvroInput();
+          .buildWriterForAvroInput(new org.apache.avro.Schema.Parser().parse(avroSchema));
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -130,16 +125,6 @@ Each of SQL data types are mapped into data types of SDK. Following are the mapp
 ### Class org.apache.carbondata.sdk.file.CarbonWriterBuilder
 ```
 /**
-* prepares the builder with the schema provided
-* @param schema is instance of Schema
-*        This method must be called when building CarbonWriterBuilder
-* @return updated CarbonWriterBuilder
-*/
-public CarbonWriterBuilder withSchema(Schema schema);
-```
-
-```
-/**
 * Sets the output path of the writer builder
 * @param path is the absolute path where output files are written
 *             This method must be called when building CarbonWriterBuilder
@@ -259,6 +244,7 @@ public CarbonWriterBuilder withLoadOptions(Map<String, String> options);
 ```
 /**
 * Build a {@link CarbonWriter}, which accepts row in CSV format object
+* @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
 * @return CSVCarbonWriter
 * @throws IOException
 * @throws InvalidLoadOptionException
@@ -269,6 +255,7 @@ public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOpti
 ```  
 /**
 * Build a {@link CarbonWriter}, which accepts Avro format object
+* @param avroSchema avro Schema object {org.apache.avro.Schema}
 * @return AvroCarbonWriter 
 * @throws IOException
 * @throws InvalidLoadOptionException
@@ -356,4 +343,4 @@ public static Schema parseJson(String json);
 * @return carbon sdk schema
 */
 public static org.apache.carbondata.sdk.file.Schema getCarbonSchemaFromAvroSchema(String avroSchemaString);
-```
\ No newline at end of file
+```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index 8ea8604..937bfa0 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -44,11 +44,10 @@ public class CarbonReaderExample {
             fields[1] = new Field("age", DataTypes.INT);
 
             CarbonWriter writer = CarbonWriter.builder()
-                    .withSchema(new Schema(fields))
                     .isTransactionalTable(true)
                     .outputPath(path)
                     .persistSchemaFile(true)
-                    .buildWriterForCSVInput();
+                    .buildWriterForCSVInput(new Schema(fields));
 
             for (int i = 0; i < 10; i++) {
                 writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index 02247cb..7fab2cc 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -66,7 +66,6 @@ public class SDKS3Example {
         fields[0] = new Field("name", DataTypes.STRING);
         fields[1] = new Field("age", DataTypes.INT);
         CarbonWriterBuilder builder = CarbonWriter.builder()
-                .withSchema(new Schema(fields))
                 .setAccessKey(args[0])
                 .setSecretKey(args[1])
                 .setEndPoint(args[2])
@@ -74,7 +73,7 @@ public class SDKS3Example {
                 .persistSchemaFile(persistSchema)
                 .isTransactionalTable(transactionalTable);
 
-        CarbonWriter writer = builder.buildWriterForCSVInput();
+        CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
 
         for (int i = 0; i < num; i++) {
             writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index 7ecde88..022b28e 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -49,15 +49,15 @@ object S3UsingSDKExample {
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.withSchema(new Schema(fields)).outputPath(writerPath).isTransactionalTable(true)
+          builder.outputPath(writerPath).isTransactionalTable(true)
             .uniqueIdentifier(
               System.currentTimeMillis)
-            .buildWriterForCSVInput()
+            .buildWriterForCSVInput(new Schema(fields))
         } else {
-          builder.withSchema(new Schema(fields)).outputPath(writerPath).isTransactionalTable(true)
+          builder.outputPath(writerPath).isTransactionalTable(true)
             .uniqueIdentifier(
               System.currentTimeMillis).withBlockSize(2)
-            .buildWriterForCSVInput()
+            .buildWriterForCSVInput(new Schema(fields))
         }
       var i = 0
       var row = num

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 9646c1d..019b915 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -59,9 +59,9 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
       val writer =
       if (persistSchema) {
         builder.persistSchemaFile(true)
-        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
       } else {
-        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+        builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
       }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index 16f19a7..66be8e4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -68,9 +68,9 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
         } else {
-          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
         }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index a15d0c2..1c74adc 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -129,27 +129,27 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.withSchema(Schema.parseJson(schema))
+          builder
             .sortBy(sortColumns.toArray)
             .outputPath(writerPath)
             .isTransactionalTable(false)
             .uniqueIdentifier(System.currentTimeMillis)
-            .buildWriterForCSVInput()
+            .buildWriterForCSVInput(Schema.parseJson(schema))
         } else {
           if (options != null) {
-            builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+            builder.outputPath(writerPath)
               .isTransactionalTable(false)
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2).withLoadOptions(options)
-              .buildWriterForCSVInput()
+              .buildWriterForCSVInput(Schema.parseJson(schema))
           } else {
-            builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+            builder.outputPath(writerPath)
               .isTransactionalTable(false)
               .sortBy(sortColumns.toArray)
               .uniqueIdentifier(
                 System.currentTimeMillis).withBlockSize(2)
-              .buildWriterForCSVInput()
+              .buildWriterForCSVInput(Schema.parseJson(schema))
           }
         }
       var i = 0
@@ -185,10 +185,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     try {
       val builder = CarbonWriter.builder()
       val writer =
-        builder.withSchema(new Schema(fields)).outputPath(writerPath)
+        builder.outputPath(writerPath)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.currentTimeMillis()).withBlockSize(2).sortBy(sortColumns)
-          .buildWriterForCSVInput()
+          .buildWriterForCSVInput(new Schema(fields))
 
       var i = 0
       while (i < rows) {
@@ -218,12 +218,12 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     try {
       val builder = CarbonWriter.builder()
       val writer =
-        builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath)
+        builder.outputPath(writerPath)
           .isTransactionalTable(false)
           .sortBy(sortColumns.toArray)
           .uniqueIdentifier(
             123).withBlockSize(2)
-          .buildWriterForCSVInput()
+          .buildWriterForCSVInput(Schema.parseJson(schema))
       var i = 0
       while (i < rows) {
         writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
@@ -915,10 +915,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     fields(2) = new Field("mydate", DataTypes.DATE)
     fields(3) = new Field("mytime", DataTypes.TIMESTAMP)
 
-    val builder: CarbonWriterBuilder = CarbonWriter.builder.withSchema(new Schema(fields))
+    val builder: CarbonWriterBuilder = CarbonWriter.builder
       .outputPath(writerPath).isTransactionalTable(false).withLoadOptions(options)
 
-    val writer: CarbonWriter = builder.buildWriterForCSVInput
+    val writer: CarbonWriter = builder.buildWriterForCSVInput(new Schema(fields))
     writer.write(Array("babu","1","02-01-2002","02-01-2002 01:01:00"));
     writer.close()
 
@@ -1014,8 +1014,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
   private def WriteFilesWithAvroWriter(rows: Int,
       mySchema: String,
-      json: String,
-      fields: Array[Field]) = {
+      json: String) = {
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
     val converter = new JsonAvroConverter
@@ -1023,9 +1022,9 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
 
     try {
-      val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+      val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
-        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -1061,15 +1060,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     val json = """ {"name":"bob", "age":10.24, "address" : {"street":"abc", "city":"bang"}} """
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.DOUBLE)
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fields(2) = new Field("address", "struct", fld)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataStructType(): Any = {
@@ -1109,16 +1100,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
     val json: String = """ {"name": "bob","age": 10,"address": ["abc", "defg"]} """
 
-
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-    // fields[1] = new Field("age", DataTypes.INT);
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fields(2) = new Field("address", "array", fld)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataSingleFileArrayType(): Any = {
@@ -1163,17 +1145,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |"address" : {"street":"abc", "city":"bang"},
         |"doorNum" : [1,2,3,4]}""".stripMargin
 
-    val fields = new Array[Field](4)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fields(2) = new Field("address", "struct", fld)
-    val fld1 = new util.ArrayList[StructField]
-    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
-    fields(3) = new Field("doorNum", "array", fld1)
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataBothStructArrayType(): Any = {
@@ -1227,19 +1199,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |{"street":"ghi","city":"city3"},
         |{"street":"jkl","city":"city4"}]} """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataArrayOfStructType(): Any = {
@@ -1310,20 +1270,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
                  |	}
                  |} """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("street", DataTypes.STRING))
-    fld2.add(new StructField("city", DataTypes.STRING))
-
-    val fld1 = new util.ArrayList[StructField]
-    fld1.add(new StructField("eachDoorNum", DataTypes.INT))
-    fld2.add(new StructField("doorNum", DataTypes.createArrayType(DataTypes.INT), fld1))
-
-    fields(2) = new Field("address","struct",fld2)
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataStructOfArrayType(): Any = {
@@ -1376,19 +1323,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |{"street":"ghi","city":"city3"},
         |{"street":"jkl","city":"city4"}]} """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("exp", DataTypes.INT)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   test("Read sdk writer Avro output Record Type with no sort columns") {
@@ -1486,15 +1421,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     // skip giving array value to take default values
     val json: String = "{\"name\": \"bob\",\"age\": 10}"
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-    // fields[1] = new Field("age", DataTypes.INT);
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fields(2) = new Field("address", "array", fld)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataSingleFileArrayDefaultType(): Any = {
@@ -1680,20 +1607,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |	]
         |} """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-    fld.add(new StructField("FloorNum", DataTypes.createArrayType(DataTypes.INT)))
-
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel3Type(): Any = {
@@ -1812,25 +1726,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |	]
         |}  """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("street", DataTypes.STRING))
-    fld.add(new StructField("city", DataTypes.STRING))
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("wing", DataTypes.STRING))
-    subFld.add(new StructField("number", DataTypes.INT))
-    fld.add(new StructField("FloorNum", DataTypes.createStructType(subFld)))
-
-    // array of struct of struct
-    val fld2 = new util.ArrayList[StructField]
-    fld2.add(new StructField("my_address", DataTypes.createStructType(fld), fld))
-    fields(2) = new Field("doorNum", DataTypes.createArrayType(fld2.get(0).getDataType), fld2)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel3_1Type(): Any = {
@@ -1910,22 +1806,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |        	"BuildNum": [[[1,2,3],[4,5,6]],[[10,20,30],[40,50,60]]]
         |        }   """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("DoorNum", DataTypes.createArrayType(DataTypes.INT), subFld))
-    // array of struct of struct
-    val doorNum = new util.ArrayList[StructField]
-    doorNum.add(new StructField("FloorNum",
-      DataTypes.createArrayType(DataTypes.createArrayType(DataTypes.INT)), fld))
-    fields(2) = new Field("BuildNum", "array", doorNum)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel3_2Type(): Any = {
@@ -2045,29 +1926,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
         |	]
         |} """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
-
-    val address = new util.ArrayList[StructField]
-    address.add(new StructField("street", DataTypes.STRING))
-    address.add(new StructField("city", DataTypes.STRING))
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("DoorNum",
-        DataTypes.createArrayType(DataTypes.createStructType(address)),
-        subFld))
-    // array of struct of struct
-    val doorNum = new util.ArrayList[StructField]
-    doorNum.add(new StructField("FloorNum",
-      DataTypes.createArrayType(
-        DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
-    fields(2) = new Field("BuildNum", "array", doorNum)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel4Type(): Any = {
@@ -2130,15 +1989,9 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = converter
       .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
 
-    val fields = new Array[Field](2)
-    fields(0) = new Field("id", DataTypes.INT)
-    val fld_s = new java.util.ArrayList[StructField]
-    fld_s.add(new StructField("course_struct_course_time", DataTypes.STRING))
-    fields(1) = new Field("course_details", "struct", fld_s)
-
     assert(intercept[RuntimeException] {
-      val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("name", "id"))
-        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+      val writer = CarbonWriter.builder.sortBy(Array("name", "id"))
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
       writer.write(record)
       writer.close()
     }.getMessage.toLowerCase.contains("column: name specified in sort columns"))
@@ -2179,8 +2032,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = converter
       .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
 
-    val writer = CarbonWriter.builder.withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(schema1))
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
     writer.write(record)
     writer.close()
   }
@@ -2219,14 +2072,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = converter
       .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
 
-    val fields = new Array[Field](2)
-    fields(0) = new Field("id", DataTypes.INT)
-    val fld_s = new java.util.ArrayList[StructField]
-    fld_s.add(new StructField("course_struct_course_time", DataTypes.TIMESTAMP))
-    fields(1) = new Field("course_details", "struct", fld_s)
-
-    val writer = CarbonWriter.builder.withSchema(new Schema(fields)).sortBy(Array("id"))
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    val writer = CarbonWriter.builder.sortBy(Array("id"))
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
     writer.write(record)
     writer.close()
   }
@@ -2271,91 +2118,10 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     val record = converter
       .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
 
-    val fields = new Array[Field](2)
-    fields(0) = new Field("id", DataTypes.LONG)
-    val fld_s = new java.util.ArrayList[StructField]
-    fld_s.add(new StructField("id", DataTypes.LONG))
-    fields(1) = new Field("entries", DataTypes.createArrayType(DataTypes.createStructType(fld_s)))
-    val writer = CarbonWriter.builder.withSchema(new Schema(fields))
-      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+    val writer = CarbonWriter.builder
+      .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput(nn)
     writer.write(record)
     writer.close()
   }
 
-  test("test if data load with various bad_records_action") {
-    val schema =
-      """{
-        |	"namespace": "com.apache.schema",
-        |	"type": "record",
-        |	"name": "StudentActivity",
-        |	"fields": [
-        |		{
-        |			"name": "id",
-        |			"type": "string"
-        |		},
-        |		{
-        |			"name": "course_details",
-        |			"type": {
-        |				"name": "course_details",
-        |				"type": "record",
-        |				"fields": [
-        |					{
-        |						"name": "course_struct_course_string",
-        |						"type": "string"
-        |					}
-        |				]
-        |			}
-        |		},
-        |		{
-        |			"name": "salary_string",
-        |			"type": {
-        |				"type": "array",
-        |				"items": "string"
-        |			}
-        |		}
-        |	]
-        |}""".stripMargin
-    val json1 =
-      """{
-        |	"id": "cust_1",
-        |	"course_details": {
-        |		"course_struct_course_string": "asd"
-        |	},
-        |	"salary_string": [
-        |		"xyz",
-        |		"abc"
-        |	]
-        |}""".stripMargin
-
-    val nn = new org.apache.avro.Schema.Parser().parse(schema)
-    val converter = new JsonAvroConverter
-    val record = converter
-      .convertToGenericDataRecord(json1.getBytes(CharEncoding.UTF_8), nn)
-
-    val fields = new Array[Field](3)
-    fields(0)=new Field("id", DataTypes.STRING)
-    val fld_s = new java.util.ArrayList[StructField]
-    fld_s.add(new StructField("carbon_int", DataTypes.INT))
-    fields(1)=new Field("course_details", "struct",fld_s)
-
-    val fld_a = new java.util.ArrayList[StructField]
-    fld_a.add(new StructField("carbon_array", DataTypes.INT))
-    fields(2)=new Field("salary_string", "array",fld_a)
-
-    val loadOptions = new util.HashMap[String, String]()
-    loadOptions.put("bad_records_action", "fail")
-    assert(intercept[Exception] {
-      val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath)
-        .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput
-      writer.write(record)
-      writer.close()
-    }.getMessage.contains("Data load failed due to bad record"))
-
-    loadOptions.put("bad_records_action", "FORCE")
-      val writer = CarbonWriter.builder.withSchema(new Schema(fields)).outputPath(writerPath)
-        .isTransactionalTable(false).withLoadOptions(loadOptions).buildWriterForAvroInput
-      writer.write(record)
-      writer.close()
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index 7f9023b..d4de428 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -19,9 +19,6 @@ package org.apache.carbondata.spark.testsuite.createTable
 
 import java.io.File
 import java.util
-import java.util.ArrayList
-
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.avro
 import org.apache.commons.io.FileUtils
@@ -32,9 +29,8 @@ import org.scalatest.BeforeAndAfterAll
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.sdk.file.CarbonWriter
 
 class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with BeforeAndAfterAll {
 
@@ -64,8 +60,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
 
   private def WriteFilesWithAvroWriter(rows: Int,
       mySchema: String,
-      json: String,
-      fields: Array[Field]) = {
+      json: String) = {
     // conversion to GenericData.Record
     val nn = new avro.Schema.Parser().parse(mySchema)
     val converter = new JsonAvroConverter
@@ -73,9 +68,9 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
       .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
 
     try {
-      val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+      val writer = CarbonWriter.builder
         .outputPath(writerPath).isTransactionalTable(false)
-        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput
+        .uniqueIdentifier(System.currentTimeMillis()).buildWriterForAvroInput(nn)
       var i = 0
       while (i < rows) {
         writer.write(record)
@@ -182,32 +177,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
         |	]
         |} """.stripMargin
 
-    val fields = new Array[Field](3)
-    fields(0) = new Field("name", DataTypes.STRING)
-    fields(1) = new Field("age", DataTypes.INT)
-
-    val subFld = new util.ArrayList[StructField]
-    subFld.add(new StructField("EachDoorNum", DataTypes.INT))
-
-    val address = new util.ArrayList[StructField]
-    address.add(new StructField("street", DataTypes.STRING))
-    address.add(new StructField("city", DataTypes.STRING))
-    address.add(new StructField("Temperature", DataTypes.DOUBLE))
-    address.add(new StructField("WindSpeed", DataTypes.createDecimalType(6,2)))
-    address.add(new StructField("year", DataTypes.DATE))
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("DoorNum",
-      DataTypes.createArrayType(DataTypes.createStructType(address)),
-      subFld))
-    // array of struct of struct
-    val doorNum = new util.ArrayList[StructField]
-    doorNum.add(new StructField("FloorNum",
-      DataTypes.createArrayType(
-        DataTypes.createArrayType(DataTypes.createStructType(address))), fld))
-    fields(2) = new Field("BuildNum", "array", doorNum)
-
-    WriteFilesWithAvroWriter(rows, mySchema, json, fields)
+    WriteFilesWithAvroWriter(rows, mySchema, json)
   }
 
   def buildAvroTestDataMultiLevel4Type(): Any = {
@@ -274,16 +244,7 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
 
     val records=new JsonAvroConverter().convertToGenericDataRecord(jsonvalue.getBytes(CharEncoding.UTF_8),pschema)
 
-    val fieds = new Array[Field](3)
-    fieds(0)=new Field("name",DataTypes.STRING);
-    fieds(1)=new Field("age",DataTypes.INT)
-
-    val fld = new util.ArrayList[StructField]
-    fld.add(new StructField("Temperature", DataTypes.DOUBLE))
-    fieds(2) = new Field("my_address", "struct", fld)
-
-
-    val writer=CarbonWriter.builder().withSchema(new Schema(fieds)).outputPath(writerPath).buildWriterForAvroInput()
+    val writer=CarbonWriter.builder().outputPath(writerPath).buildWriterForAvroInput(pschema)
     writer.write(records)
     writer.close()
     sql("DROP TABLE IF EXISTS sdkOutputTable")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 53dadf6..54b23a5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -58,9 +58,9 @@ object TestSparkCarbonFileFormatWithSparkSession {
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)
-          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
         } else {
-          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).buildWriterForCSVInput()
+          builder.outputPath(writerPath).buildWriterForCSVInput(Schema.parseJson(schema))
         }
 
       var i = 0

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 9f2f295..8bbf364 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -156,15 +156,11 @@ public class AvroCarbonWriter extends CarbonWriter {
   /**
    * converts avro schema to carbon schema required by carbonWriter
    *
-   * @param avroSchemaString json formatted avro schema as string
+   * @param avroSchema avro schema
    * @return carbon sdk schema
    */
   public static org.apache.carbondata.sdk.file.Schema getCarbonSchemaFromAvroSchema(
-      String avroSchemaString) {
-    if (avroSchemaString == null) {
-      throw new UnsupportedOperationException("avro schema string cannot be null");
-    }
-    Schema avroSchema = new Schema.Parser().parse(avroSchemaString);
+      Schema avroSchema) {
     Field[] carbonField = new Field[avroSchema.getFields().size()];
     int i = 0;
     for (Schema.Field avroField : avroSchema.getFields()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 585975f..bf99e05 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -67,18 +67,6 @@ public class CarbonWriterBuilder {
   private String taskNo;
 
   /**
-   * prepares the builder with the schema provided
-   * @param schema is instance of Schema
-   * This method must be called when building CarbonWriterBuilder
-   * @return updated CarbonWriterBuilder
-   */
-  public CarbonWriterBuilder withSchema(Schema schema) {
-    Objects.requireNonNull(schema, "schema should not be null");
-    this.schema = schema;
-    return this;
-  }
-
-  /**
    * Sets the output path of the writer builder
    * @param path is the absolute path where output files are written
    * This method must be called when building CarbonWriterBuilder
@@ -310,24 +298,30 @@ public class CarbonWriterBuilder {
 
   /**
    * Build a {@link CarbonWriter}, which accepts row in CSV format
+   * @param schema carbon Schema object {org.apache.carbondata.sdk.file.Schema}
    * @return CSVCarbonWriter
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
+  public CarbonWriter buildWriterForCSVInput(Schema schema)
+      throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
+    this.schema = schema;
     CarbonLoadModel loadModel = createLoadModel();
     return new CSVCarbonWriter(loadModel);
   }
 
   /**
    * Build a {@link CarbonWriter}, which accepts Avro object
+   * @param avroSchema avro Schema object {org.apache.avro.Schema}
    * @return AvroCarbonWriter
    * @throws IOException
    * @throws InvalidLoadOptionException
    */
-  public CarbonWriter buildWriterForAvroInput() throws IOException, InvalidLoadOptionException {
+  public CarbonWriter buildWriterForAvroInput(org.apache.avro.Schema avroSchema)
+      throws IOException, InvalidLoadOptionException {
+    this.schema = AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema);
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
     CarbonLoadModel loadModel = createLoadModel();
@@ -537,4 +531,4 @@ public class CarbonWriterBuilder {
     setCsvHeader(build);
     return build;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 104c6e4..b70e74d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -73,10 +73,9 @@ public class AvroCarbonWriterTest {
 
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema))
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput();
+          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema));
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -144,10 +143,9 @@ public class AvroCarbonWriterTest {
 
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(avroSchema))
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput();
+          .buildWriterForAvroInput(new Schema.Parser().parse(avroSchema));
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -239,10 +237,9 @@ public class AvroCarbonWriterTest {
 
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput();
+          .buildWriterForAvroInput(nn);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -303,10 +300,9 @@ public class AvroCarbonWriterTest {
 
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
           .outputPath(path)
           .isTransactionalTable(true)
-          .buildWriterForAvroInput();
+          .buildWriterForAvroInput(nn);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);
@@ -343,10 +339,9 @@ public class AvroCarbonWriterTest {
 
     try {
       CarbonWriter writer = CarbonWriter.builder()
-          .withSchema(AvroCarbonWriter.getCarbonSchemaFromAvroSchema(mySchema))
           .outputPath(path)
           .isTransactionalTable(true).sortBy(sortColumns)
-          .buildWriterForAvroInput();
+          .buildWriterForAvroInput(nn);
 
       for (int i = 0; i < 100; i++) {
         writer.write(record);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index fc283b6..1eed47b 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -98,11 +98,10 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(new Schema(fields))
           .isTransactionalTable(true)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput();
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{
@@ -225,7 +224,7 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("age", DataTypes.INT);
     try {
       carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
-          outputPath(path).withSchema(new Schema(fields)).buildWriterForCSVInput();
+          outputPath(path).buildWriterForCSVInput(new Schema(fields));
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -245,7 +244,7 @@ public class CSVCarbonWriterTest {
     fields[1] = new Field("age", DataTypes.INT);
     try {
       carbonWriter = CarbonWriter.builder().isTransactionalTable(false).
-          outputPath(path).withSchema(new Schema(fields)).buildWriterForCSVInput();
+          outputPath(path).buildWriterForCSVInput(new Schema(fields));
     } catch (InvalidLoadOptionException e) {
       e.printStackTrace();
       Assert.assertTrue(false);
@@ -268,11 +267,10 @@ public class CSVCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(new Schema(fields))
           .isTransactionalTable(true).taskNo(5)
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput();
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
 
       for (int i = 0; i < 2; i++) {
         String[] row = new String[]{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
index 881b5a5..0393077 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -104,7 +104,6 @@ public class CSVNonTransactionalCarbonWriterTest {
       boolean persistSchema, int blockletSize, int blockSize) {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(schema)
           .isTransactionalTable(false)
           .uniqueIdentifier(System.currentTimeMillis())
           .taskNo(System.nanoTime())
@@ -122,7 +121,7 @@ public class CSVNonTransactionalCarbonWriterTest {
         builder = builder.withBlockSize(blockSize);
       }
 
-      CarbonWriter writer = builder.buildWriterForCSVInput();
+      CarbonWriter writer = builder.buildWriterForCSVInput(schema);
 
       for (int i = 0; i < rows; i++) {
         writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
@@ -168,13 +167,12 @@ public class CSVNonTransactionalCarbonWriterTest {
 
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(new Schema(fields))
           .uniqueIdentifier(System.currentTimeMillis())
           .isTransactionalTable(false)
           .taskNo(System.nanoTime())
           .outputPath(path);
 
-      CarbonWriter writer = builder.buildWriterForCSVInput();
+      CarbonWriter writer = builder.buildWriterForCSVInput(new Schema(fields));
 
       for (int i = 0; i < 100; i++) {
         String[] row = new String[]{

http://git-wip-us.apache.org/repos/asf/carbondata/blob/26eb2d0b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 97de1a0..eb406e2 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -63,7 +63,6 @@ public class TestUtil {
       boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(schema)
           .isTransactionalTable(isTransactionalTable)
           .outputPath(path);
       if (sortColumns != null) {
@@ -79,7 +78,7 @@ public class TestUtil {
         builder = builder.withBlockSize(blockSize);
       }
 
-      CarbonWriter writer = builder.buildWriterForCSVInput();
+      CarbonWriter writer = builder.buildWriterForCSVInput(schema);
 
       for (int i = 0; i < rows; i++) {
         writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});