You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/01 07:13:22 UTC

carbondata git commit: [CARBONDATA-2313] fixed muliple issues in SKD writer and external table with nonTransactional table data

Repository: carbondata
Updated Branches:
  refs/heads/master 5229443bd -> 4b98af22d


[CARBONDATA-2313] fixed muliple issues in SKD writer and external table with nonTransactional table data

*Header update for sdk interface api
*bad record path issue in sdk writer, should not be "null/null/null/taskno" changed to "sdkBadRecords/taskno"
*Non transactional table, Number format exception was coming instead of bad record exception when load fails due to bad record
*Non transactional table, insert overwrite failure case old files must not be deleted
*Non transactional table, describe formatted path should be files path
*SDK, default all dimensions munst be inverted index encoding
*SDK, avro not supporting float datatype

This closes #2240


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b98af22
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b98af22
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b98af22

Branch: refs/heads/master
Commit: 4b98af22d327125f9b5a011c32be7bc1d48edb98
Parents: 5229443
Author: ajantha-bhat <aj...@gmail.com>
Authored: Mon Apr 23 16:17:29 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 1 12:41:32 2018 +0530

----------------------------------------------------------------------
 .../schema/table/TableSchemaBuilder.java        |  4 +
 .../spark/rdd/CarbonDataRDDFactory.scala        | 15 +++-
 .../table/CarbonDescribeFormattedCommand.scala  |  9 ++-
 .../loading/BadRecordsLoggerProvider.java       | 18 +++--
 .../processing/util/CarbonBadRecordUtil.java    | 14 +++-
 .../processing/util/CarbonLoaderUtil.java       |  3 -
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  1 +
 .../sdk/file/CarbonWriterBuilder.java           | 67 +++++++++++-----
 .../org/apache/carbondata/sdk/file/Field.java   | 10 +++
 .../org/apache/carbondata/sdk/file/Schema.java  |  6 ++
 .../sdk/file/AvroCarbonWriterTest.java          | 80 +++++++++++++++++++-
 11 files changed, 190 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 8187145..617d58f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -139,6 +139,10 @@ public class TableSchemaBuilder {
     } else {
       otherColumns.add(newColumn);
     }
+
+    if (newColumn.isDimensionColumn()) {
+      newColumn.setUseInvertedIndex(true);
+    }
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0b9bd66..6873289 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -471,7 +471,10 @@ object CarbonDataRDDFactory {
       // update the load entry in table status file for changing the status to marked for delete
       CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
       LOGGER.info("********starting clean up**********")
-      CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        // delete segment is applicable for transactional table
+        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+      }
       LOGGER.info("********clean up done**********")
       LOGGER.audit(s"Data load is failed for " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -486,7 +489,10 @@ object CarbonDataRDDFactory {
         // update the load entry in table status file for changing the status to marked for delete
         CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+        if (carbonLoadModel.isCarbonTransactionalTable) {
+          // delete segment is applicable for transactional table
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+        }
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -539,7 +545,10 @@ object CarbonDataRDDFactory {
       if (!done || !commitComplete) {
         CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+        if (carbonLoadModel.isCarbonTransactionalTable) {
+          // delete segment is applicable for transactional table
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+        }
         LOGGER.info("********clean up done**********")
         LOGGER.audit("Data load is failed for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 7e5edd8..9b69373 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -81,12 +81,17 @@ private[sql] case class CarbonDescribeFormattedCommand(
     } else {
       colProps.toString()
     }
+    val carbonTable = relation.carbonTable
     results ++= Seq(("", "", ""), ("##Detailed Table Information", "", ""))
     results ++= Seq(("Database Name", relation.carbonTable.getDatabaseName, "")
     )
     results ++= Seq(("Table Name", relation.carbonTable.getTableName, ""))
-    results ++= Seq(("CARBON Store Path ", CarbonProperties.getStorePath, ""))
-    val carbonTable = relation.carbonTable
+    if (carbonTable.isTransactionalTable) {
+      results ++= Seq(("CARBON Store Path ", CarbonProperties.getStorePath, ""))
+    } else {
+      // for NonTransactional table should show files path.
+      results ++= Seq(("CARBON Store Path ", carbonTable.getTablePath, ""))
+    }
 
     val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
index 614a959..c2ddff8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
@@ -71,13 +71,21 @@ public class BadRecordsLoggerProvider {
     }
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String storeLocation = "";
+    if (configuration.isCarbonTransactionalTable()) {
+      storeLocation =
+          identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+              + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo();
+    } else {
+      storeLocation =
+          "SdkWriterBadRecords" + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo();
+    }
+
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
         identifier.getTableName() + '_' + System.currentTimeMillis(),
-        getBadLogStoreLocation(configuration,
-            identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
-                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
-        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+        getBadLogStoreLocation(configuration, storeLocation), badRecordsLogRedirect,
+        badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
   }
 
   public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
index 26a6f77..c494eef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonBadRecordUtil.java
@@ -49,9 +49,17 @@ public class CarbonBadRecordUtil {
     // rename the bad record in progress to normal
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
-    renameBadRecordsFromInProgressToNormal(configuration,
-        identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
-            + configuration.getSegmentId() + File.separator + configuration.getTaskNo());
+    String storeLocation = "";
+    if (configuration.isCarbonTransactionalTable()) {
+      storeLocation =
+          identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+              + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo();
+    } else {
+      storeLocation =
+          "SdkWriterBadRecords" + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo();
+    }
+    renameBadRecordsFromInProgressToNormal(configuration, storeLocation);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index f61ca55..6d938e1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -465,9 +465,6 @@ public final class CarbonLoaderUtil {
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
 
-    if (!model.isCarbonTransactionalTable() && insertOverwrite) {
-      CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(model);
-    }
     boolean entryAdded = CarbonLoaderUtil
         .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid);
     if (!entryAdded) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 046a4ee..458dea8 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -85,6 +85,7 @@ class AvroCarbonWriter extends CarbonWriter {
       case LONG:
       case DOUBLE:
       case STRING:
+      case FLOAT:
         out.append(fieldValue.toString());
         break;
       default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 770c6b0..7ee22ed 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -47,7 +47,7 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
 import org.apache.hadoop.fs.s3a.Constants;
 
 /**
- * Biulder for {@link CarbonWriter}
+ * Builder for {@link CarbonWriter}
  */
 @InterfaceAudience.User
 @InterfaceStability.Unstable
@@ -66,6 +66,7 @@ public class CarbonWriterBuilder {
   /**
    * prepares the builder with the schema provided
    * @param schema is instance of Schema
+   * This method must be called when building CarbonWriterBuilder
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withSchema(Schema schema) {
@@ -77,6 +78,7 @@ public class CarbonWriterBuilder {
   /**
    * Sets the output path of the writer builder
    * @param path is the absolute path where output files are written
+   * This method must be called when building CarbonWriterBuilder
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder outputPath(String path) {
@@ -88,8 +90,8 @@ public class CarbonWriterBuilder {
   /**
    * sets the list of columns that needs to be in sorted order
    * @param sortColumns is a string array of columns that needs to be sorted.
-   *                    If it is null, all dimensions are selected for sorting
-   *                    If it is empty array, no columns are sorted
+   * If it is null or by default all dimensions are selected for sorting
+   * If it is empty array, no columns are sorted
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder sortBy(String[] sortColumns) {
@@ -99,8 +101,9 @@ public class CarbonWriterBuilder {
 
   /**
    * sets the taskNo for the writer. SDKs concurrently running
-   * will set taskNo in order to avoid conflits in file write.
-   * @param taskNo is the TaskNo user wants to specify. Mostly it system time.
+   * will set taskNo in order to avoid conflicts in file's name during write.
+   * @param taskNo is the TaskNo user wants to specify.
+   * by default it is system time in nano seconds.
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder taskNo(String taskNo) {
@@ -112,7 +115,8 @@ public class CarbonWriterBuilder {
 
   /**
    * If set, create a schema file in metadata folder.
-   * @param persist is a boolean value, If set, create a schema file in metadata folder
+   * @param persist is a boolean value, If set to true, creates a schema file in metadata folder.
+   * By default set to false. will not create metadata folder
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder persistSchemaFile(boolean persist) {
@@ -122,8 +126,12 @@ public class CarbonWriterBuilder {
 
   /**
    * If set false, writes the carbondata and carbonindex files in a flat folder structure
-   * @param isTransactionalTable is a boolelan value if set to false then writes
-   *                     the carbondata and carbonindex files in a flat folder structure
+   * @param isTransactionalTable is a boolelan value
+   * if set to false, then writes the carbondata and carbonindex files
+   * in a flat folder structure.
+   * if set to true, then writes the carbondata and carbonindex files
+   * in segment folder structure..
+   * By default set to false.
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable) {
@@ -201,7 +209,8 @@ public class CarbonWriterBuilder {
 
   /**
    * to set the timestamp in the carbondata and carbonindex index files
-   * @param UUID is a timestamp to be used in the carbondata and carbonindex index files
+   * @param UUID is a timestamp to be used in the carbondata and carbonindex index files.
+   * By default set to zero.
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder uniqueIdentifier(long UUID) {
@@ -213,16 +222,28 @@ public class CarbonWriterBuilder {
   /**
    * To support the load options for sdk writer
    * @param options key,value pair of load options.
-   *                supported keys values are
-   *                a. bad_records_logger_enable -- true (write into separate logs), false
-   *                b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
-   *                c. bad_record_path -- path
-   *                d. dateformat -- same as JAVA SimpleDateFormat
-   *                e. timestampformat -- same as JAVA SimpleDateFormat
-   *                f. complex_delimiter_level_1 -- value to Split the complexTypeData
-   *                g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
-   *                h. quotechar
-   *                i. escapechar
+   * supported keys values are
+   * a. bad_records_logger_enable -- true (write into separate logs), false
+   * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+   * c. bad_record_path -- path
+   * d. dateformat -- same as JAVA SimpleDateFormat
+   * e. timestampformat -- same as JAVA SimpleDateFormat
+   * f. complex_delimiter_level_1 -- value to Split the complexTypeData
+   * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
+   * h. quotechar
+   * i. escapechar
+   *
+   * Default values are as follows.
+   *
+   * a. bad_records_logger_enable -- "false"
+   * b. bad_records_action -- "FAIL"
+   * c. bad_record_path -- ""
+   * d. dateformat -- "" , uses from carbon.properties file
+   * e. timestampformat -- "", uses from carbon.properties file
+   * f. complex_delimiter_level_1 -- "$"
+   * g. complex_delimiter_level_2 -- ":"
+   * h. quotechar -- "\""
+   * i. escapechar -- "\\"
    *
    * @return updated CarbonWriterBuilder
    */
@@ -259,6 +280,7 @@ public class CarbonWriterBuilder {
   /**
    * To set the carbondata file size in MB between 1MB-2048MB
    * @param blockSize is size in MB between 1MB to 2048 MB
+   * default value is 1024 MB
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withBlockSize(int blockSize) {
@@ -272,6 +294,7 @@ public class CarbonWriterBuilder {
   /**
    * To set the blocklet size of carbondata file
    * @param blockletSize is blocklet size in MB
+   * default value is 64 MB
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withBlockletSize(int blockletSize) {
@@ -284,6 +307,9 @@ public class CarbonWriterBuilder {
 
   /**
    * Build a {@link CarbonWriter}, which accepts row in CSV format
+   * @return CSVCarbonWriter
+   * @throws IOException
+   * @throws InvalidLoadOptionException
    */
   public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
@@ -294,8 +320,9 @@ public class CarbonWriterBuilder {
 
   /**
    * Build a {@link CarbonWriter}, which accepts Avro object
-   * @return
+   * @return AvroCarbonWriter
    * @throws IOException
+   * @throws InvalidLoadOptionException
    */
   public CarbonWriter buildWriterForAvroInput() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
index 6742fa7..72a3ce4 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Field.java
@@ -32,6 +32,11 @@ public class Field {
   private String name;
   private DataType type;
 
+  /**
+   * Field Constructor
+   * @param name name of the field
+   * @param type datatype of field, specified in strings.
+   */
   public Field(String name, String type) {
     this.name = name;
     if (type.equalsIgnoreCase("string")) {
@@ -59,6 +64,11 @@ public class Field {
     }
   }
 
+  /**
+   * Field constructor
+   * @param name name of the field
+   * @param type datatype of the field of class DataType
+   */
   public Field(String name, DataType type) {
     this.name = name;
     this.type = type;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
index 52a4611..31c202d 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/Schema.java
@@ -36,6 +36,10 @@ public class Schema {
 
   private Field[] fields;
 
+  /**
+   * construct a schema with fields
+   * @param fields
+   */
   public Schema(Field[] fields) {
     this.fields = fields;
   }
@@ -46,6 +50,8 @@ public class Schema {
    *   {"name":"string"},
    *   {"age":"int"}
    * ]
+   * @param json specified as string
+   * @return Schema
    */
   public static Schema parseJson(String json) {
     GsonBuilder gsonBuilder = new GsonBuilder();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b98af22/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index c30bd3a..f85f7d5 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -94,9 +94,87 @@ public class AvroCarbonWriterTest {
 
   @Test
   public void testWriteAllPrimitive() throws IOException {
-    // TODO
+    FileUtils.deleteDirectory(new File(path));
+
+    // Avro schema
+    // Supported Primitive Datatype.
+    // 1. Boolean
+    // 2. Int
+    // 3. long
+    // 4. float -> To carbon Internally it is double.
+    // 5. double
+    // 6. String
+
+    // Not Supported
+    // 1.NULL Datatype
+    // 2.Bytes
+
+    String avroSchema = "{\n" + "  \"name\" : \"myrecord\",\n"
+        + "  \"namespace\": \"org.apache.parquet.avro\",\n" + "  \"type\" : \"record\",\n"
+        + "  \"fields\" : [ "
+        + " {\n" + "    \"name\" : \"myboolean\",\n" + "    \"type\" : \"boolean\"\n  },"
+        + " {\n" + "    \"name\" : \"myint\",\n" + "    \"type\" : \"int\"\n" + "  }, "
+        + " {\n    \"name\" : \"mylong\",\n" + "    \"type\" : \"long\"\n" + "  },"
+        + " {\n   \"name\" : \"myfloat\",\n" + "    \"type\" : \"float\"\n" + "  }, "
+        + " {\n \"name\" : \"mydouble\",\n" + "    \"type\" : \"double\"\n" + "  },"
+        + " {\n \"name\" : \"mystring\",\n" + "    \"type\" : \"string\"\n" + "  }\n" + "] }";
+
+    String json = "{"
+        + "\"myboolean\":true, "
+        + "\"myint\": 10, "
+        + "\"mylong\": 7775656565,"
+        + " \"myfloat\": 0.2, "
+        + "\"mydouble\": 44.56, "
+        + "\"mystring\":\"Ajantha\"}";
+
+
+    // conversion to GenericData.Record
+    JsonAvroConverter converter = new JsonAvroConverter();
+    GenericData.Record record = converter.convertToGenericDataRecord(
+        json.getBytes(CharEncoding.UTF_8), new Schema.Parser().parse(avroSchema));
+
+    Field[] fields = new Field[6];
+    // fields[0] = new Field("mynull", DataTypes.NULL);
+    fields[0] = new Field("myboolean", DataTypes.BOOLEAN);
+    fields[1] = new Field("myint", DataTypes.INT);
+    fields[2] = new Field("mylong", DataTypes.LONG);
+    fields[3] = new Field("myfloat", DataTypes.DOUBLE);
+    fields[4] = new Field("mydouble", DataTypes.DOUBLE);
+    fields[5] = new Field("mystring", DataTypes.STRING);
+
+
+    try {
+      CarbonWriter writer = CarbonWriter.builder()
+          .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
+          .outputPath(path)
+          .isTransactionalTable(true)
+          .buildWriterForAvroInput();
+
+      for (int i = 0; i < 100; i++) {
+        writer.write(record);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null"));
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(1, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
   }
 
+
   @Test
   public void testWriteNestedRecord() throws IOException {
     // TODO