You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/20 15:44:35 UTC

[02/10] carbondata git commit: [CARBONDATA-2253][SDK] Support write JSON/Avro data to carbon files

[CARBONDATA-2253][SDK] Support write JSON/Avro data to carbon files

This PR adds AvroCarbonWriter in SDK, it can be used to write JSON or Avro data to carbon files

This closes #2061


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e39b0a14
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e39b0a14
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e39b0a14

Branch: refs/heads/carbonfile
Commit: e39b0a14a196224dbe6fdce2ff53e09b3b76b876
Parents: 04ff367
Author: Jacky Li <ja...@qq.com>
Authored: Wed Mar 14 00:06:37 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 16 23:02:37 2018 +0800

----------------------------------------------------------------------
 pom.xml                                         |   6 -
 store/sdk/pom.xml                               |  95 +------
 .../carbondata/sdk/file/AvroCarbonWriter.java   | 125 +++++++++
 .../sdk/file/CarbonWriterBuilder.java           |  29 +-
 .../sdk/file/AvroCarbonWriterTest.java          | 104 ++++++++
 .../sdk/file/CSVCarbonWriterSuite.java          | 267 -------------------
 .../sdk/file/CSVCarbonWriterTest.java           | 267 +++++++++++++++++++
 7 files changed, 519 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 972be1e..287e052 100644
--- a/pom.xml
+++ b/pom.xml
@@ -579,12 +579,6 @@
       <id>include-all</id>
     </profile>
     <profile>
-      <id>store-sdk</id>
-      <modules>
-        <module>store/sdk</module>
-      </modules>
-    </profile>
-    <profile>
       <id>sdvtest</id>
       <modules>
         <module>integration/spark-common-cluster-test</module>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 9f7038a..1d1735e 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -25,55 +25,27 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>junit</groupId>
-      <artifactId>junit</artifactId>
+      <groupId>tech.allegro.schema.json2avro</groupId>
+      <artifactId>converter</artifactId>
+      <version>0.2.5</version>
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.scalatest</groupId>
-      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>
 
   <build>
+    <sourceDirectory>src/main/java</sourceDirectory>
     <resources>
       <resource>
-        <directory>src/resources</directory>
-      </resource>
-      <resource>
         <directory>.</directory>
       </resource>
     </resources>
     <plugins>
       <plugin>
-        <groupId>org.scala-tools</groupId>
-        <artifactId>maven-scala-plugin</artifactId>
-        <version>2.15.2</version>
-        <executions>
-          <execution>
-            <id>compile</id>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-            <phase>compile</phase>
-          </execution>
-          <execution>
-            <id>testCompile</id>
-            <goals>
-              <goal>testCompile</goal>
-            </goals>
-            <phase>test</phase>
-          </execution>
-          <execution>
-            <phase>process-resources</phase>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
           <source>1.7</source>
@@ -82,61 +54,6 @@
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>2.18</version>
-        <!-- Note config is repeated in scalatest config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-          </systemProperties>
-          <failIfNoTests>false</failIfNoTests>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.scalatest</groupId>
-        <artifactId>scalatest-maven-plugin</artifactId>
-        <version>1.0</version>
-        <!-- Note config is repeated in surefire config -->
-        <configuration>
-          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <junitxml>.</junitxml>
-          <filereports>CarbonTestSuite.txt</filereports>
-          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
-          </argLine>
-          <stderr />
-          <environmentVariables>
-          </environmentVariables>
-          <systemProperties>
-            <java.awt.headless>true</java.awt.headless>
-          </systemProperties>
-        </configuration>
-        <executions>
-          <execution>
-            <id>test</id>
-            <goals>
-              <goal>test</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-deploy-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-install-plugin</artifactId>
-        <configuration>
-          <skip>true</skip>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-shade-plugin</artifactId>
         <configuration>
           <shadedArtifactAttached>false</shadedArtifactAttached>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
new file mode 100644
index 0000000..e88164c
--- /dev/null
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+/**
+ * Writer Implementation to write Avro Record to carbondata file.
+ */
+@InterfaceAudience.Internal
+class AvroCarbonWriter extends CarbonWriter {
+
+  private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter;
+  private TaskAttemptContext context;
+  private ObjectArrayWritable writable;
+  private Schema avroSchema;
+
+  AvroCarbonWriter(CarbonLoadModel loadModel) throws IOException {
+    Configuration hadoopConf = new Configuration();
+    CarbonTableOutputFormat.setLoadModel(hadoopConf, loadModel);
+    CarbonTableOutputFormat format = new CarbonTableOutputFormat();
+    JobID jobId = new JobID(UUID.randomUUID().toString(), 0);
+    Random random = new Random();
+    TaskID task = new TaskID(jobId, TaskType.MAP, random.nextInt());
+    TaskAttemptID attemptID = new TaskAttemptID(task, random.nextInt());
+    TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID);
+    this.recordWriter = format.getRecordWriter(context);
+    this.context = context;
+    this.writable = new ObjectArrayWritable();
+  }
+
+  private String[] avroToCsv(GenericData.Record avroRecord) {
+    if (avroSchema == null) {
+      avroSchema = avroRecord.getSchema();
+    }
+    List<Schema.Field> fields = avroSchema.getFields();
+    String[] csvField = new String[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i));
+    }
+    return csvField;
+  }
+
+  private String avroFieldToString(Schema.Field fieldType, Object fieldValue) {
+    StringBuilder out = new StringBuilder();
+    Schema.Type type = fieldType.schema().getType();
+    switch (type) {
+      case BOOLEAN:
+      case INT:
+      case LONG:
+      case DOUBLE:
+      case STRING:
+        out.append(fieldValue.toString());
+        break;
+      default:
+        throw new UnsupportedOperationException();
+      // TODO: convert complex type
+    }
+    return out.toString();
+  }
+
+  /**
+   * Write single row data, input row is Avro Record
+   */
+  @Override
+  public void write(Object object) throws IOException {
+    GenericData.Record record = (GenericData.Record) object;
+
+    // convert Avro record to CSV String[]
+    String[] csvRecord = avroToCsv(record);
+    writable.set(csvRecord);
+    try {
+      recordWriter.write(NullWritable.get(), writable);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Flush and close the writer
+   */
+  @Override
+  public void close() throws IOException {
+    try {
+      recordWriter.close(context);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 8734341..5be60c4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -104,7 +104,23 @@ public class CarbonWriterBuilder {
   public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
+    CarbonLoadModel loadModel = createLoadModel();
+    return new CSVCarbonWriter(loadModel);
+  }
+
+  /**
+   * Build a {@link CarbonWriter}, which accepts Avro object
+   * @return
+   * @throws IOException
+   */
+  public CarbonWriter buildWriterForAvroInput() throws IOException, InvalidLoadOptionException {
+    Objects.requireNonNull(schema, "schema should not be null");
+    Objects.requireNonNull(path, "path should not be null");
+    CarbonLoadModel loadModel = createLoadModel();
+    return new AvroCarbonWriter(loadModel);
+  }
 
+  private CarbonLoadModel createLoadModel() throws IOException, InvalidLoadOptionException {
     // build CarbonTable using schema
     CarbonTable table = buildCarbonTable();
     if (persistSchemaFile) {
@@ -113,18 +129,7 @@ public class CarbonWriterBuilder {
     }
 
     // build LoadModel
-    CarbonLoadModel loadModel = buildLoadModel(table);
-    return new CSVCarbonWriter(loadModel);
-  }
-
-  /**
-   * Build a {@link CarbonWriter}, which accepts Avro object
-   * @return
-   * @throws IOException
-   */
-  public CarbonWriter buildWriterForAvroInput() throws IOException {
-    // TODO
-    throw new UnsupportedOperationException();
+    return buildLoadModel(table);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
new file mode 100644
index 0000000..25c34e0
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.CharEncoding;
+import org.junit.Assert;
+import org.junit.Test;
+
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+import org.apache.avro.Schema;
+
+public class AvroCarbonWriterTest {
+  private String path = "./AvroCarbonWriterSuiteWriteFiles";
+
+  @Test
+  public void testWriteBasic() throws IOException {
+    FileUtils.deleteDirectory(new File(path));
+
+    // Avro schema
+    String avroSchema =
+        "{" +
+            "   \"type\" : \"record\"," +
+            "   \"name\" : \"Acme\"," +
+            "   \"fields\" : ["
+            + "{ \"name\" : \"name\", \"type\" : \"string\" },"
+            + "{ \"name\" : \"age\", \"type\" : \"int\" }]" +
+        "}";
+
+    String json = "{\"name\":\"bob\", \"age\":10}";
+
+    // conversion to GenericData.Record
+    JsonAvroConverter converter = new JsonAvroConverter();
+    GenericData.Record record = converter.convertToGenericDataRecord(
+        json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.STRING);
+
+    try {
+      CarbonWriter writer = CarbonWriter.builder()
+          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .outputPath(path)
+          .buildWriterForAvroInput();
+
+      for (int i = 0; i < 100; i++) {
+        writer.write(record);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(1, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testWriteAllPrimitive() throws IOException {
+    // TODO
+  }
+
+  @Test
+  public void testWriteNestedRecord() throws IOException {
+    // TODO
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
deleted file mode 100644
index 0ac6f38..0000000
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.sdk.file;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test suite for {@link CSVCarbonWriter}
- */
-public class CSVCarbonWriterSuite {
-
-  @Test
-  public void testWriteFiles() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testWriteFilesJsonSchema() throws IOException {
-    String path = "./testWriteFilesJsonSchema";
-    FileUtils.deleteDirectory(new File(path));
-
-    String schema = new StringBuilder()
-        .append("[ \n")
-        .append("   {\"name\":\"string\"},\n")
-        .append("   {\"age\":\"int\"},\n")
-        .append("   {\"height\":\"double\"}\n")
-        .append("]")
-        .toString();
-
-    writeFilesAndVerify(Schema.parseJson(schema), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path) {
-    writeFilesAndVerify(schema, path, null);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
-    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
-  }
-
-  /**
-   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
-   * @param rows number of rows to write
-   * @param schema schema of the file
-   * @param path local write path
-   * @param sortColumns sort columns
-   * @param persistSchema true if want to persist schema file
-   * @param blockletSize blockletSize in the file, -1 for default size
-   * @param blockSize blockSize in the file, -1 for default size
-   */
-  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize) {
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(schema)
-          .outputPath(path);
-      if (sortColumns != null) {
-        builder = builder.sortBy(sortColumns);
-      }
-      if (persistSchema) {
-        builder = builder.persistSchemaFile(true);
-      }
-      if (blockletSize != -1) {
-        builder = builder.withBlockletSize(blockletSize);
-      }
-      if (blockSize != -1) {
-        builder = builder.withBlockSize(blockSize);
-      }
-
-      CarbonWriter writer = builder.buildWriterForCSVInput();
-
-      for (int i = 0; i < rows; i++) {
-        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
-      }
-      writer.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    }
-
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-  }
-
-  @Test
-  public void testAllPrimitiveDataType() throws IOException {
-    // TODO: write all data type and read by CarbonRecordReader to verify the content
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[9];
-    fields[0] = new Field("stringField", DataTypes.STRING);
-    fields[1] = new Field("intField", DataTypes.INT);
-    fields[2] = new Field("shortField", DataTypes.SHORT);
-    fields[3] = new Field("longField", DataTypes.LONG);
-    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
-    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
-    fields[6] = new Field("dateField", DataTypes.DATE);
-    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
-    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
-
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(new Schema(fields))
-          .outputPath(path);
-
-      CarbonWriter writer = builder.buildWriterForCSVInput();
-
-      for (int i = 0; i < 100; i++) {
-        String[] row = new String[]{
-            "robot" + (i % 10),
-            String.valueOf(i),
-            String.valueOf(i),
-            String.valueOf(Long.MAX_VALUE - i),
-            String.valueOf((double) i / 2),
-            String.valueOf(true),
-            "2019-03-02",
-            "2019-02-12 03:03:34"
-        };
-        writer.write(row);
-      }
-      writer.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    }
-
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Blocklet() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
-
-    // TODO: implement reader to verify the number of blocklet in the file
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Block() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
-
-    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertEquals(2, dataFiles.length);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testSortColumns() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
-
-    // TODO: implement reader and verify the data is sorted
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testPartitionOutput() {
-    // TODO: test write data with partition
-  }
-
-  @Test
-  public void testSchemaPersistence() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, true);
-
-    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
-    Assert.assertTrue(new File(schemaFile).exists());
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e39b0a14/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
new file mode 100644
index 0000000..2281fe6
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class CSVCarbonWriterTest {
+
+  @Test
+  public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testWriteFilesJsonSchema() throws IOException {
+    String path = "./testWriteFilesJsonSchema";
+    FileUtils.deleteDirectory(new File(path));
+
+    String schema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"}\n")
+        .append("]")
+        .toString();
+
+    writeFilesAndVerify(Schema.parseJson(schema), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   */
+  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize) {
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(schema)
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+  }
+
+  @Test
+  public void testAllPrimitiveDataType() throws IOException {
+    // TODO: write all data type and read by CarbonRecordReader to verify the content
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(new Schema(fields))
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34"
+        };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Blocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+
+    // TODO: implement reader to verify the number of blocklet in the file
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Block() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(2, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testSortColumns() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+    // TODO: implement reader and verify the data is sorted
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testPartitionOutput() {
+    // TODO: test write data with partition
+  }
+
+  @Test
+  public void testSchemaPersistence() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+    Assert.assertTrue(new File(schemaFile).exists());
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}