You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/18 04:32:29 UTC

carbondata git commit: [CARBONDATA-3073] Support configure TableProperties, withLoadOption etc. interface in carbon writer of C++ SDK support withTableProperty, withLoadOption, taskNo, uniqueIdentifier, withThreadSafe, withBlockSize, withBlockletSize, local

Repository: carbondata
Updated Branches:
  refs/heads/master 38abb6bd9 -> eb52c37c3


[CARBONDATA-3073] Support configure TableProperties,withLoadOption etc.
interface in carbon writer of C++ SDK support withTableProperty,
withLoadOption,taskNo, uniqueIdentifier, withThreadSafe,withBlockSize,
withBlockletSize, localDictionaryThreshold, enableLocalDictionary, sortBy in C++ SDK

support withTableProperty, withLoadOption,taskNo, uniqueIdentifier, withThreadSafe,
withBlockSize, withBlockletSize, localDictionaryThreshold, enableLocalDictionary,sortBy in C++ SDK

This closes #2899


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb52c37c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb52c37c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb52c37c

Branch: refs/heads/master
Commit: eb52c37c38b474c48e620a5bdd3f1cf9d68ffda6
Parents: 38abb6b
Author: xubo245 <xu...@huawei.com>
Authored: Mon Nov 5 19:43:19 2018 +0800
Committer: kunal642 <ku...@gmail.com>
Committed: Tue Dec 18 10:01:07 2018 +0530

----------------------------------------------------------------------
 docs/csdk-guide.md                              | 125 +++++
 .../examples/sdk/CarbonReaderExample.java       |   4 +-
 .../examples/sdk/SDKS3ReadExample.java          |  15 +-
 store/CSDK/CMakeLists.txt                       |   6 +-
 store/CSDK/src/CarbonReader.h                   |   4 +-
 store/CSDK/src/CarbonSchemaReader.cpp           |   4 +-
 store/CSDK/src/CarbonWriter.cpp                 | 211 +++++++++
 store/CSDK/src/CarbonWriter.h                   | 105 +++++
 store/CSDK/src/Schema.h                         |   5 -
 store/CSDK/test/main.cpp                        | 468 ++++++++++++-------
 .../sdk/file/CarbonWriterBuilder.java           |  56 ++-
 .../sdk/file/CSVCarbonWriterTest.java           |  38 ++
 12 files changed, 850 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/docs/csdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/csdk-guide.md b/docs/csdk-guide.md
index 6002cf5..f82c4fb 100644
--- a/docs/csdk-guide.md
+++ b/docs/csdk-guide.md
@@ -195,6 +195,18 @@ release the memory and destroy JVM.
 
 ```
     /**
+      * sets the list of columns that needs to be in sorted order
+      *
+      * @param argc argc argument counter, the number of projection column
+      * @param argv argv is a string array of columns that needs to be sorted.
+      *                  If it is null or by default all dimensions are selected for sorting
+      *                  If it is empty array, no columns are sorted
+      */
+    void sortBy(int argc, char *argv[]);
+```
+
+```
+    /**
      * configure the schema with json style schema
      *
      * @param jsonSchema json style schema
@@ -215,6 +227,119 @@ release the memory and destroy JVM.
 ```
 
 ```
+ /**
+     *  To support the table properties for writer
+     *
+     * @param key properties key
+     * @param value properties value
+     */
+    void withTableProperty(char *key, char *value);
+```
+
+```
+    /**
+     * To support the load options for C++ sdk writer
+     *
+     * @param options key,value pair of load options.
+     * supported keys values are
+     * a. bad_records_logger_enable -- true (write into separate logs), false
+     * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+     * c. bad_record_path -- path
+     * d. dateformat -- same as JAVA SimpleDateFormat
+     * e. timestampformat -- same as JAVA SimpleDateFormat
+     * f. complex_delimiter_level_1 -- value to Split the complexTypeData
+     * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
+     * h. quotechar
+     * i. escapechar
+     *
+     * Default values are as follows.
+     *
+     * a. bad_records_logger_enable -- "false"
+     * b. bad_records_action -- "FAIL"
+     * c. bad_record_path -- ""
+     * d. dateformat -- "" , uses from carbon.properties file
+     * e. timestampformat -- "", uses from carbon.properties file
+     * f. complex_delimiter_level_1 -- "$"
+     * g. complex_delimiter_level_2 -- ":"
+     * h. quotechar -- "\""
+     * i. escapechar -- "\\"
+     *
+     * @return updated CarbonWriterBuilder
+     */
+    void withLoadOption(char *key, char *value);
+```
+
+```
+    /**
+     * sets the taskNo for the writer. CSDKs concurrently running
+     * will set taskNo in order to avoid conflicts in file's name during write.
+     *
+     * @param taskNo is the TaskNo user wants to specify.
+     *               by default it is system time in nano seconds.
+     */
+    void taskNo(long taskNo);
+```
+
+```
+    /**
+     * to set the timestamp in the carbondata and carbonindex index files
+     *
+     * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
+     * By default set to zero.
+     * @return updated CarbonWriterBuilder
+     */
+    void uniqueIdentifier(long timestamp);
+```
+
+```
+    /**
+     * To make c++ sdk writer thread safe.
+     *
+     * @param numOfThreads should number of threads in which writer is called in multi-thread scenario
+     *                      default C++ sdk writer is not thread safe.
+     *                      can use one writer instance in one thread only.
+     */
+    void withThreadSafe(short numOfThreads) ;
+```
+
+```
+    /**
+     * To set the carbondata file size in MB between 1MB-2048MB
+     *
+     * @param blockSize is size in MB between 1MB to 2048 MB
+     * default value is 1024 MB
+     */
+    void withBlockSize(int blockSize);
+```
+
+```
+    /**
+     * To set the blocklet size of CarbonData file
+     *
+     * @param blockletSize is blocklet size in MB
+     *        default value is 64 MB
+     * @return updated CarbonWriterBuilder
+     */
+    void withBlockletSize(int blockletSize);
+```
+
+```
+    /**
+     * @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
+     * @return updated CarbonWriterBuilder
+     */
+    void localDictionaryThreshold(int localDictionaryThreshold);
+```
+
+```
+    /**
+     * @param enableLocalDictionary enable local dictionary, default is false
+     * @return updated CarbonWriterBuilder
+     */
+    void enableLocalDictionary(bool enableLocalDictionary);
+```
+
+```
     /**
      * @param appName appName which is writing the carbondata files
      */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index c730070..dab86e6 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -56,11 +56,9 @@ public class CarbonReaderExample {
             fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
             fields[9] = new Field("varcharField", DataTypes.VARCHAR);
             fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
-            Map<String, String> map = new HashMap<>();
-            map.put("complex_delimiter_level_1", "#");
             CarbonWriter writer = CarbonWriter.builder()
                 .outputPath(path)
-                .withLoadOptions(map)
+                .withLoadOption("complex_delimiter_level_1", "#")
                 .withCsvInput(new Schema(fields))
                 .writtenBy("CarbonReaderExample")
                 .build();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
index 2462d8d..94e4c8d 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
@@ -43,26 +43,23 @@ public class SDKS3ReadExample {
             System.exit(0);
         }
 
-        String path = "s3a://sdk/WriterOutput";
+        String path = "s3a://sdk/WriterOutput/carbondata5";
         if (args.length > 3) {
             path=args[3];
         }
 
         // Read data
-
         EqualToExpression equalToExpression = new EqualToExpression(
             new ColumnExpression("name", DataTypes.STRING),
             new LiteralExpression("robot1", DataTypes.STRING));
 
-        Configuration configuration = new Configuration();
-        configuration.set(ACCESS_KEY, args[0]);
-        configuration.set(SECRET_KEY, args[1]);
-        configuration.set(ENDPOINT, args[2]);
         CarbonReader reader = CarbonReader
             .builder(path, "_temp")
             .projection(new String[]{"name", "age"})
             .filter(equalToExpression)
-            .withHadoopConf(configuration)
+            .withHadoopConf(ACCESS_KEY, args[0])
+            .withHadoopConf(SECRET_KEY, args[1])
+            .withHadoopConf(ENDPOINT, args[2])
             .build();
 
         System.out.println("\nData:");
@@ -79,7 +76,9 @@ public class SDKS3ReadExample {
         CarbonReader reader2 = CarbonReader
             .builder(path, "_temp")
             .projection(new String[]{"name", "age"})
-            .withHadoopConf(configuration)
+            .withHadoopConf(ACCESS_KEY, args[0])
+            .withHadoopConf(SECRET_KEY, args[1])
+            .withHadoopConf(ENDPOINT, args[2])
             .build();
 
         System.out.println("\nData:");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/store/CSDK/CMakeLists.txt b/store/CSDK/CMakeLists.txt
index 137e6fd..d35ebad 100644
--- a/store/CSDK/CMakeLists.txt
+++ b/store/CSDK/CMakeLists.txt
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 cmake_minimum_required(VERSION 2.8)
-project(CJDK)
+project(CSDK)
 set(CMAKE_BUILD_TYPE Debug)
 SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH true)
 
@@ -27,8 +27,8 @@ set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/Carbo
         src/CarbonRow.cpp src/CarbonWriter.h  src/CarbonWriter.cpp src/CarbonSchemaReader.h
         src/CarbonSchemaReader.cpp src/Schema.h src/Schema.cpp src/CarbonProperties.cpp src/CarbonProperties.h)
 
-add_executable(CJDK ${SOURCE_FILES})
+add_executable(CSDK ${SOURCE_FILES})
 get_filename_component(JAVA_JVM_LIBRARY_DIR ${JAVA_JVM_LIBRARY} DIRECTORY)
 message(${JAVA_JVM_LIBRARY_DIR})
-target_link_libraries(CJDK ${JAVA_JVM_LIBRARY})
+target_link_libraries(CSDK ${JAVA_JVM_LIBRARY})
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonReader.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonReader.h b/store/CSDK/src/CarbonReader.h
index 0b4a82e..50cc851 100644
--- a/store/CSDK/src/CarbonReader.h
+++ b/store/CSDK/src/CarbonReader.h
@@ -94,8 +94,8 @@ public:
     /**
      * Configure the projection column names of carbon reader
      *
-     * @param argc argument counter
-     * @param argv argument vector
+     * @param argc argument counter, the number of projection column
+     * @param argv argument vector, projection column names
      */
     void projection(int argc, char *argv[]);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonSchemaReader.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonSchemaReader.cpp b/store/CSDK/src/CarbonSchemaReader.cpp
index c895f4b..2743bfe 100644
--- a/store/CSDK/src/CarbonSchemaReader.cpp
+++ b/store/CSDK/src/CarbonSchemaReader.cpp
@@ -34,7 +34,7 @@ jobject CarbonSchemaReader::readSchema(char *path) {
         throw std::runtime_error("path parameter can't be NULL.");
     }
     jmethodID methodID = jniEnv->GetStaticMethodID(carbonSchemaReaderClass, "readSchema",
-                                                   "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
+        "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
     if (methodID == NULL) {
         throw std::runtime_error("Can't find the method in java: readSchema");
     }
@@ -53,7 +53,7 @@ jobject CarbonSchemaReader::readSchema(char *path, bool validateSchema) {
         throw std::runtime_error("path parameter can't be NULL.");
     }
     jmethodID methodID = jniEnv->GetStaticMethodID(carbonSchemaReaderClass, "readSchema",
-                                                   "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
+        "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
     if (methodID == NULL) {
         throw std::runtime_error("Can't find the method in java: readSchema");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonWriter.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonWriter.cpp b/store/CSDK/src/CarbonWriter.cpp
index 2a4bcc7..6ce5d7d 100644
--- a/store/CSDK/src/CarbonWriter.cpp
+++ b/store/CSDK/src/CarbonWriter.cpp
@@ -58,6 +58,38 @@ void CarbonWriter::outputPath(char *path) {
     carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
 }
 
+void CarbonWriter::sortBy(int argc, char **argv) {
+    if (argc < 0) {
+        throw std::runtime_error("argc parameter can't be negative.");
+    }
+    if (argv == NULL) {
+        throw std::runtime_error("argv parameter can't be NULL.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "sortBy",
+        "([Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: sortBy");
+    }
+    jclass objectArrayClass = jniEnv->FindClass("Ljava/lang/String;");
+    if (objectArrayClass == NULL) {
+        throw std::runtime_error("Can't find the class in java: java/lang/String");
+    }
+    jobjectArray array = jniEnv->NewObjectArray(argc, objectArrayClass, NULL);
+    for (int i = 0; i < argc; ++i) {
+        jstring value = jniEnv->NewStringUTF(argv[i]);
+        jniEnv->SetObjectArrayElement(array, i, value);
+    }
+
+    jvalue args[1];
+    args[0].l = array;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
 void CarbonWriter::withCsvInput(char *jsonSchema) {
     if (jsonSchema == NULL) {
         throw std::runtime_error("jsonSchema parameter can't be NULL.");
@@ -98,6 +130,185 @@ void CarbonWriter::withHadoopConf(char *key, char *value) {
     carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
 }
 
+void CarbonWriter::withTableProperty(char *key, char *value) {
+    if (key == NULL) {
+        throw std::runtime_error("key parameter can't be NULL.");
+    }
+    if (value == NULL) {
+        throw std::runtime_error("value parameter can't be NULL.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withTableProperty",
+        "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withTableProperty");
+    }
+    jvalue args[2];
+    args[0].l = jniEnv->NewStringUTF(key);
+    args[1].l = jniEnv->NewStringUTF(value);
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::withLoadOption(char *key, char *value) {
+    if (key == NULL) {
+        throw std::runtime_error("key parameter can't be NULL.");
+    }
+    if (value == NULL) {
+        throw std::runtime_error("value parameter can't be NULL.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withLoadOption",
+         "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withLoadOption");
+    }
+    jvalue args[2];
+    args[0].l = jniEnv->NewStringUTF(key);
+    args[1].l = jniEnv->NewStringUTF(value);
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::taskNo(long taskNo) {
+    if (taskNo < 0) {
+        throw std::runtime_error("taskNo parameter can't be negative.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "taskNo",
+        "(J)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: taskNo");
+    }
+    jvalue args[1];
+    args[0].j = taskNo;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::uniqueIdentifier(long timestamp) {
+    if (timestamp < 1) {
+        throw std::runtime_error("timestamp parameter can't be negative.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "uniqueIdentifier",
+        "(J)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: uniqueIdentifier");
+    }
+    jvalue args[1];
+    args[0].j = timestamp;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::withThreadSafe(short numOfThreads) {
+    if (numOfThreads < 1) {
+        throw std::runtime_error("numOfThreads parameter can't be negative.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withThreadSafe",
+        "(S)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withThreadSafe");
+    }
+    jvalue args[1];
+    args[0].s = numOfThreads;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::withBlockSize(int blockSize) {
+    if (blockSize < 1) {
+        throw std::runtime_error("blockSize parameter should be positive number.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withBlockSize",
+        "(I)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withBlockSize");
+    }
+    jvalue args[1];
+    args[0].i = blockSize;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::withBlockletSize(int blockletSize) {
+    if (blockletSize < 1) {
+        throw std::runtime_error("blockletSize parameter should be positive number.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withBlockletSize",
+        "(I)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: withBlockletSize");
+    }
+    jvalue args[1];
+    args[0].i = blockletSize;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::localDictionaryThreshold(int localDictionaryThreshold) {
+    if (localDictionaryThreshold < 1) {
+        throw std::runtime_error("localDictionaryThreshold parameter should be positive number.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "localDictionaryThreshold",
+        "(I)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: localDictionaryThreshold");
+    }
+    jvalue args[1];
+    args[0].i = localDictionaryThreshold;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
+void CarbonWriter::enableLocalDictionary(bool enableLocalDictionary) {
+    if (enableLocalDictionary == NULL) {
+        throw std::runtime_error("enableLocalDictionary parameter can't be NULL.");
+    }
+    checkBuilder();
+    jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+    jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "enableLocalDictionary",
+        "(Z)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+    if (methodID == NULL) {
+        throw std::runtime_error("Can't find the method in java: enableLocalDictionary");
+    }
+    jvalue args[1];
+    args[0].z = enableLocalDictionary;
+    carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+    if (jniEnv->ExceptionCheck()) {
+        throw jniEnv->ExceptionOccurred();
+    }
+}
+
 void CarbonWriter::writtenBy(char *appName) {
     checkBuilder();
     jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonWriter.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonWriter.h b/store/CSDK/src/CarbonWriter.h
index a822432..fdb0bc6 100644
--- a/store/CSDK/src/CarbonWriter.h
+++ b/store/CSDK/src/CarbonWriter.h
@@ -77,6 +77,16 @@ public:
      */
     void outputPath(char *path);
 
+     /**
+      * sets the list of columns that needs to be in sorted order
+      *
+      * @param argc argc argument counter, the number of projection column
+      * @param argv argv is a string array of columns that needs to be sorted.
+      *                  If it is null or by default all dimensions are selected for sorting
+      *                  If it is empty array, no columns are sorted
+      */
+    void sortBy(int argc, char *argv[]);
+
     /**
      * configure the schema with json style schema
      *
@@ -95,6 +105,101 @@ public:
     void withHadoopConf(char *key, char *value);
 
     /**
+     *  To support the table properties for writer
+     *
+     * @param key properties key
+     * @param value properties value
+     */
+    void withTableProperty(char *key, char *value);
+
+    /**
+     * To support the load options for C++ sdk writer
+     *
+     * @param options key,value pair of load options.
+     * supported keys values are
+     * a. bad_records_logger_enable -- true (write into separate logs), false
+     * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+     * c. bad_record_path -- path
+     * d. dateformat -- same as JAVA SimpleDateFormat
+     * e. timestampformat -- same as JAVA SimpleDateFormat
+     * f. complex_delimiter_level_1 -- value to Split the complexTypeData
+     * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
+     * h. quotechar
+     * i. escapechar
+     *
+     * Default values are as follows.
+     *
+     * a. bad_records_logger_enable -- "false"
+     * b. bad_records_action -- "FAIL"
+     * c. bad_record_path -- ""
+     * d. dateformat -- "" , uses from carbon.properties file
+     * e. timestampformat -- "", uses from carbon.properties file
+     * f. complex_delimiter_level_1 -- "$"
+     * g. complex_delimiter_level_2 -- ":"
+     * h. quotechar -- "\""
+     * i. escapechar -- "\\"
+     *
+     * @return updated CarbonWriterBuilder
+     */
+    void withLoadOption(char *key, char *value);
+
+    /**
+     * sets the taskNo for the writer. CSDKs concurrently running
+     * will set taskNo in order to avoid conflicts in file's name during write.
+     *
+     * @param taskNo is the TaskNo user wants to specify.
+     *               by default it is system time in nano seconds.
+     */
+    void taskNo(long taskNo);
+
+    /**
+     * to set the timestamp in the carbondata and carbonindex index files
+     *
+     * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
+     * By default set to zero.
+     * @return updated CarbonWriterBuilder
+     */
+    void uniqueIdentifier(long timestamp);
+
+    /**
+     * To make c++ sdk writer thread safe.
+     *
+     * @param numOfThreads should number of threads in which writer is called in multi-thread scenario
+     *                      default C++ sdk writer is not thread safe.
+     *                      can use one writer instance in one thread only.
+     */
+    void withThreadSafe(short numOfThreads) ;
+
+    /**
+     * To set the carbondata file size in MB between 1MB-2048MB
+     *
+     * @param blockSize is size in MB between 1MB to 2048 MB
+     * default value is 1024 MB
+     */
+    void withBlockSize(int blockSize);
+
+    /**
+     * To set the blocklet size of CarbonData file
+     *
+     * @param blockletSize is blocklet size in MB
+     *        default value is 64 MB
+     * @return updated CarbonWriterBuilder
+     */
+    void withBlockletSize(int blockletSize);
+
+    /**
+     * @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
+     * @return updated CarbonWriterBuilder
+     */
+    void localDictionaryThreshold(int localDictionaryThreshold);
+
+    /**
+     * @param enableLocalDictionary enable local dictionary, default is false
+     * @return updated CarbonWriterBuilder
+     */
+    void enableLocalDictionary(bool enableLocalDictionary);
+
+    /**
      * @param appName appName which is writing the carbondata files
      */
     void writtenBy(char *appName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/Schema.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/Schema.h b/store/CSDK/src/Schema.h
index 67434ef..1306f46 100644
--- a/store/CSDK/src/Schema.h
+++ b/store/CSDK/src/Schema.h
@@ -17,11 +17,6 @@
 
 #include <jni.h>
 
-#ifndef CJDK_SCHEMA_H
-#define CJDK_SCHEMA_H
-
-#endif //CJDK_SCHEMA_H
-
 class Schema {
 private:
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/test/main.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp
index 5ab3976..140420b 100644
--- a/store/CSDK/test/main.cpp
+++ b/store/CSDK/test/main.cpp
@@ -178,9 +178,9 @@ void printResult(JNIEnv *env, CarbonReader reader) {
  */
 bool readSchema(JNIEnv *env, char *Path, bool validateSchema) {
     printf("\nread Schema from Index File:\n");
-    CarbonSchemaReader carbonSchemaReader(env);
-    jobject schema;
     try {
+        CarbonSchemaReader carbonSchemaReader(env);
+        jobject schema;
         if (validateSchema) {
             schema = carbonSchemaReader.readSchema(Path, validateSchema);
         } else {
@@ -247,6 +247,7 @@ bool readFromLocalWithoutProjection(JNIEnv *env, char *path) {
         carbonReaderClass.build();
     } catch (jthrowable e) {
         env->ExceptionDescribe();
+        env->ExceptionClear();
     }
     printResult(env, carbonReaderClass);
 }
@@ -261,10 +262,9 @@ void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int arg
     printBoolean(useVectorReader);
     printf("\n");
 
-    struct timeval start, build, startRead, endBatchRead, endRead;
-    gettimeofday(&start, NULL);
-
     try {
+        struct timeval start, build, startRead, endBatchRead, endRead;
+        gettimeofday(&start, NULL);
         CarbonReader carbonReaderClass;
 
         carbonReaderClass.builder(env, path);
@@ -323,6 +323,7 @@ void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int arg
         carbonRow.close();
     } catch (jthrowable) {
         env->ExceptionDescribe();
+        env->ExceptionClear();
     }
 }
 
@@ -332,98 +333,100 @@ void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int arg
  * @param env  jni env
  */
 void testReadNextBatchRow(JNIEnv *env, char *path, int batchSize, int printNum, char **argv, int argc,
-                          bool useVectorReader) {
-    printf("\n\nTest next Batch Row Performance:\n");
-    printBoolean(useVectorReader);
-    printf("\n");
+         bool useVectorReader) {
+    try {
+        printf("\n\nTest next Batch Row Performance:\n");
+        printBoolean(useVectorReader);
+        printf("\n");
 
-    struct timeval start, build, read;
-    gettimeofday(&start, NULL);
+        struct timeval start, build, read;
+        gettimeofday(&start, NULL);
 
-    CarbonReader carbonReaderClass;
+        CarbonReader carbonReaderClass;
 
-    carbonReaderClass.builder(env, path);
-    if (argc > 1) {
-        carbonReaderClass.withHadoopConf("fs.s3a.access.key", argv[1]);
-        carbonReaderClass.withHadoopConf("fs.s3a.secret.key", argv[2]);
-        carbonReaderClass.withHadoopConf("fs.s3a.endpoint", argv[3]);
-    }
-    if (!useVectorReader) {
-        carbonReaderClass.withRowRecordReader();
-    }
-    carbonReaderClass.withBatch(batchSize);
-    try {
+        carbonReaderClass.builder(env, path);
+        if (argc > 1) {
+            carbonReaderClass.withHadoopConf("fs.s3a.access.key", argv[1]);
+            carbonReaderClass.withHadoopConf("fs.s3a.secret.key", argv[2]);
+            carbonReaderClass.withHadoopConf("fs.s3a.endpoint", argv[3]);
+        }
+        if (!useVectorReader) {
+            carbonReaderClass.withRowRecordReader();
+        }
+        carbonReaderClass.withBatch(batchSize);
         carbonReaderClass.build();
-    } catch (jthrowable e) {
-        env->ExceptionDescribe();
-    }
-
-    gettimeofday(&build, NULL);
-    int time = 1000000 * (build.tv_sec - start.tv_sec) + build.tv_usec - start.tv_usec;
-    double buildTime = time / 1000000.0;
-    printf("\n\nbuild time is: %lf s\n\n", time / 1000000.0);
 
-    CarbonRow carbonRow(env);
-    int i = 0;
-    struct timeval startHasNext, startReadNextBatchRow, endReadNextBatchRow, endRead;
-    gettimeofday(&startHasNext, NULL);
+        gettimeofday(&build, NULL);
+        int time = 1000000 * (build.tv_sec - start.tv_sec) + build.tv_usec - start.tv_usec;
+        double buildTime = time / 1000000.0;
+        printf("\n\nbuild time is: %lf s\n\n", time / 1000000.0);
 
-    while (carbonReaderClass.hasNext()) {
+        CarbonRow carbonRow(env);
+        int i = 0;
+        struct timeval startHasNext, startReadNextBatchRow, endReadNextBatchRow, endRead;
+        gettimeofday(&startHasNext, NULL);
 
-        gettimeofday(&startReadNextBatchRow, NULL);
-        jobjectArray batch = carbonReaderClass.readNextBatchRow();
-        if (env->ExceptionCheck()) {
-            env->ExceptionDescribe();
-        }
-        gettimeofday(&endReadNextBatchRow, NULL);
+        while (carbonReaderClass.hasNext()) {
 
-        jsize length = env->GetArrayLength(batch);
-        if (i + length > printNum - 1) {
-            for (int j = 0; j < length; j++) {
-                i++;
-                jobject row = env->GetObjectArrayElement(batch, j);
-                carbonRow.setCarbonRow(row);
-                carbonRow.getString(0);
-                carbonRow.getString(1);
-                carbonRow.getString(2);
-                carbonRow.getString(3);
-                carbonRow.getLong(4);
-                carbonRow.getLong(5);
-                if (i > 1 && i % printNum == 0) {
-                    gettimeofday(&read, NULL);
-
-                    double hasNextTime = 1000000 * (startReadNextBatchRow.tv_sec - startHasNext.tv_sec) +
-                                         startReadNextBatchRow.tv_usec - startHasNext.tv_usec;
-
-                    double readNextBatchTime = 1000000 * (endReadNextBatchRow.tv_sec - startReadNextBatchRow.tv_sec) +
-                                               endReadNextBatchRow.tv_usec - startReadNextBatchRow.tv_usec;
-
-                    time = 1000000 * (read.tv_sec - startHasNext.tv_sec) + read.tv_usec - startHasNext.tv_usec;
-                    printf("%d: time is %lf s, speed is %lf records/s, hasNext time is %lf s,readNextBatchRow time is %lf s ",
-                           i, time / 1000000.0, printNum / (time / 1000000.0), hasNextTime / 1000000.0,
-                           readNextBatchTime / 1000000.0);
-                    gettimeofday(&startHasNext, NULL);
-                    printf("%s\t", carbonRow.getString(0));
-                    printf("%s\t", carbonRow.getString(1));
-                    printf("%s\t", carbonRow.getString(2));
-                    printf("%s\t", carbonRow.getString(3));
-                    printf("%ld\t", carbonRow.getLong(4));
-                    printf("%ld\t", carbonRow.getLong(5));
-                    printf("\n");
+            gettimeofday(&startReadNextBatchRow, NULL);
+            jobjectArray batch = carbonReaderClass.readNextBatchRow();
+            if (env->ExceptionCheck()) {
+                env->ExceptionDescribe();
+            }
+            gettimeofday(&endReadNextBatchRow, NULL);
+
+            jsize length = env->GetArrayLength(batch);
+            if (i + length > printNum - 1) {
+                for (int j = 0; j < length; j++) {
+                    i++;
+                    jobject row = env->GetObjectArrayElement(batch, j);
+                    carbonRow.setCarbonRow(row);
+                    carbonRow.getString(0);
+                    carbonRow.getString(1);
+                    carbonRow.getString(2);
+                    carbonRow.getString(3);
+                    carbonRow.getLong(4);
+                    carbonRow.getLong(5);
+                    if (i > 1 && i % printNum == 0) {
+                        gettimeofday(&read, NULL);
+
+                        double hasNextTime = 1000000 * (startReadNextBatchRow.tv_sec - startHasNext.tv_sec) +
+                                             startReadNextBatchRow.tv_usec - startHasNext.tv_usec;
+
+                        double readNextBatchTime =
+                                1000000 * (endReadNextBatchRow.tv_sec - startReadNextBatchRow.tv_sec) +
+                                endReadNextBatchRow.tv_usec - startReadNextBatchRow.tv_usec;
+
+                        time = 1000000 * (read.tv_sec - startHasNext.tv_sec) + read.tv_usec - startHasNext.tv_usec;
+                        printf("%d: time is %lf s, speed is %lf records/s, hasNext time is %lf s,readNextBatchRow time is %lf s ",
+                               i, time / 1000000.0, printNum / (time / 1000000.0), hasNextTime / 1000000.0,
+                               readNextBatchTime / 1000000.0);
+                        gettimeofday(&startHasNext, NULL);
+                        printf("%s\t", carbonRow.getString(0));
+                        printf("%s\t", carbonRow.getString(1));
+                        printf("%s\t", carbonRow.getString(2));
+                        printf("%s\t", carbonRow.getString(3));
+                        printf("%ld\t", carbonRow.getLong(4));
+                        printf("%ld\t", carbonRow.getLong(5));
+                        printf("\n");
+                    }
+                    env->DeleteLocalRef(row);
                 }
-                env->DeleteLocalRef(row);
+            } else {
+                i = i + length;
             }
-        } else {
-            i = i + length;
+            env->DeleteLocalRef(batch);
         }
-        env->DeleteLocalRef(batch);
+        gettimeofday(&endRead, NULL);
+        time = 1000000 * (endRead.tv_sec - build.tv_sec) + endRead.tv_usec - build.tv_usec;
+        printf("total line is: %d,\t build time is: %lf s,\tread time is %lf s, average speed is %lf records/s  ",
+               i, buildTime, time / 1000000.0, i / (time / 1000000.0));
+        carbonReaderClass.close();
+        carbonRow.close();
+    } catch (jthrowable e) {
+        env->ExceptionDescribe();
+        env->ExceptionClear();
     }
-    gettimeofday(&endRead, NULL);
-    time = 1000000 * (endRead.tv_sec - build.tv_sec) + endRead.tv_usec - build.tv_usec;
-    printf("total line is: %d,\t build time is: %lf s,\tread time is %lf s, average speed is %lf records/s  ",
-           i, buildTime, time / 1000000.0, i / (time / 1000000.0));
-    carbonReaderClass.close();
-    carbonRow.close();
 }
 
 /**
@@ -434,54 +437,54 @@ void testReadNextBatchRow(JNIEnv *env, char *path, int batchSize, int printNum,
  */
 bool readFromLocalWithProjection(JNIEnv *env, char *path) {
     printf("\nRead data from local:\n");
+    try {
+        CarbonReader reader;
+        reader.builder(env, path, "test");
+
+        char *argv[12];
+        argv[0] = "stringField";
+        argv[1] = "shortField";
+        argv[2] = "intField";
+        argv[3] = "longField";
+        argv[4] = "doubleField";
+        argv[5] = "boolField";
+        argv[6] = "dateField";
+        argv[7] = "timeField";
+        argv[8] = "decimalField";
+        argv[9] = "varcharField";
+        argv[10] = "arrayField";
+        argv[11] = "floatField";
+        reader.projection(12, argv);
 
-    CarbonReader reader;
-    reader.builder(env, path, "test");
+        reader.build();
 
-    char *argv[12];
-    argv[0] = "stringField";
-    argv[1] = "shortField";
-    argv[2] = "intField";
-    argv[3] = "longField";
-    argv[4] = "doubleField";
-    argv[5] = "boolField";
-    argv[6] = "dateField";
-    argv[7] = "timeField";
-    argv[8] = "decimalField";
-    argv[9] = "varcharField";
-    argv[10] = "arrayField";
-    argv[11] = "floatField";
-    reader.projection(12, argv);
+        CarbonRow carbonRow(env);
+        while (reader.hasNext()) {
+            jobject row = reader.readNextRow();
+            carbonRow.setCarbonRow(row);
 
-    try {
-        reader.build();
+            printf("%s\t", carbonRow.getString(0));
+            printf("%d\t", carbonRow.getShort(1));
+            printf("%d\t", carbonRow.getInt(2));
+            printf("%ld\t", carbonRow.getLong(3));
+            printf("%lf\t", carbonRow.getDouble(4));
+            printBoolean(carbonRow.getBoolean(5));
+            printf("%d\t", carbonRow.getInt(6));
+            printf("%ld\t", carbonRow.getLong(7));
+            printf("%s\t", carbonRow.getDecimal(8));
+            printf("%s\t", carbonRow.getVarchar(9));
+            printArray(env, carbonRow.getArray(10));
+            printf("%f\t", carbonRow.getFloat(11));
+            printf("\n");
+            env->DeleteLocalRef(row);
+        }
+
+        reader.close();
+        carbonRow.close();
     } catch (jthrowable e) {
         env->ExceptionDescribe();
+        env->ExceptionClear();
     }
-
-    CarbonRow carbonRow(env);
-    while (reader.hasNext()) {
-        jobject row = reader.readNextRow();
-        carbonRow.setCarbonRow(row);
-
-        printf("%s\t", carbonRow.getString(0));
-        printf("%d\t", carbonRow.getShort(1));
-        printf("%d\t", carbonRow.getInt(2));
-        printf("%ld\t", carbonRow.getLong(3));
-        printf("%lf\t", carbonRow.getDouble(4));
-        printBoolean(carbonRow.getBoolean(5));
-        printf("%d\t", carbonRow.getInt(6));
-        printf("%ld\t", carbonRow.getLong(7));
-        printf("%s\t", carbonRow.getDecimal(8));
-        printf("%s\t", carbonRow.getVarchar(9));
-        printArray(env, carbonRow.getArray(10));
-        printf("%f\t", carbonRow.getFloat(11));
-        printf("\n");
-        env->DeleteLocalRef(row);
-    }
-
-    reader.close();
-    carbonRow.close();
 }
 
 
@@ -500,17 +503,23 @@ bool tryCatchException(JNIEnv *env) {
 }
 
 void testCarbonProperties(JNIEnv *env) {
-    printf("%s", "test Carbon Properties:");
-    CarbonProperties carbonProperties(env);
-    char *key = "carbon.unsafe.working.memory.in.mb";
-    printf("%s\t", carbonProperties.getProperty(key));
-    printf("%s\t", carbonProperties.getProperty(key, "512"));
-    carbonProperties.addProperty(key, "1024");
-    printf("%s\t", carbonProperties.getProperty(key));
+    try {
+        printf("%s", "test Carbon Properties:");
+        CarbonProperties carbonProperties(env);
+        char *key = "carbon.unsafe.working.memory.in.mb";
+        printf("%s\t", carbonProperties.getProperty(key));
+        printf("%s\t", carbonProperties.getProperty(key, "512"));
+        carbonProperties.addProperty(key, "1024");
+        printf("%s\t", carbonProperties.getProperty(key));
+    } catch (jthrowable e) {
+        env->ExceptionDescribe();
+        env->ExceptionClear();
+    }
 }
 
 /**
- * test write data to local disk
+ * test write data
+ * test WithLoadOption interface
  *
  * @param env  jni env
  * @param path file path
@@ -526,7 +535,15 @@ bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) {
         writer.builder(env);
         writer.outputPath(path);
         writer.withCsvInput(jsonSchema);
+        writer.withLoadOption("complex_delimiter_level_1", "#");
         writer.writtenBy("CSDK");
+        writer.taskNo(185);
+        writer.withThreadSafe(1);
+        writer.uniqueIdentifier(1549911814000000);
+        writer.withBlockSize(1);
+        writer.withBlockletSize(16);
+        writer.enableLocalDictionary(true);
+        writer.localDictionaryThreshold(10000);
         if (argc > 3) {
             writer.withHadoopConf("fs.s3a.access.key", argv[1]);
             writer.withHadoopConf("fs.s3a.secret.key", argv[2]);
@@ -534,7 +551,7 @@ bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) {
         }
         writer.build();
 
-        int rowNum = 10;
+        int rowNum = 70000;
         int size = 10;
         long longValue = 0;
         double doubleValue = 0;
@@ -610,35 +627,163 @@ bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) {
         carbonReader.builder(env, path);
         carbonReader.build();
         int i = 0;
+        int printNum = 10;
         CarbonRow carbonRow(env);
         while (carbonReader.hasNext()) {
             jobject row = carbonReader.readNextRow();
             i++;
             carbonRow.setCarbonRow(row);
-            printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2));
-            jobjectArray array1 = carbonRow.getArray(3);
-            jsize length = env->GetArrayLength(array1);
-            int j = 0;
-            for (j = 0; j < length; j++) {
-                jobject element = env->GetObjectArrayElement(array1, j);
-                char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE);
-                printf("%s\t", str);
-            }
-            printf("%d\t", carbonRow.getShort(4));
-            printf("%d\t", carbonRow.getInt(5));
-            printf("%ld\t", carbonRow.getLong(6));
-            printf("%lf\t", carbonRow.getDouble(7));
-            bool bool1 = carbonRow.getBoolean(8);
-            if (bool1) {
-                printf("true\t");
-            } else {
-                printf("false\t");
+            if (i < printNum) {
+                printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2));
+                jobjectArray array1 = carbonRow.getArray(3);
+                jsize length = env->GetArrayLength(array1);
+                int j = 0;
+                for (j = 0; j < length; j++) {
+                    jobject element = env->GetObjectArrayElement(array1, j);
+                    char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE);
+                    printf("%s\t", str);
+                }
+                printf("%d\t", carbonRow.getShort(4));
+                printf("%d\t", carbonRow.getInt(5));
+                printf("%ld\t", carbonRow.getLong(6));
+                printf("%lf\t", carbonRow.getDouble(7));
+                bool bool1 = carbonRow.getBoolean(8);
+                if (bool1) {
+                    printf("true\t");
+                } else {
+                    printf("false\t");
+                }
+                printf("%f\t\n", carbonRow.getFloat(9));
             }
-            printf("%f\t\n", carbonRow.getFloat(9));
             env->DeleteLocalRef(row);
         }
         carbonReader.close();
-        carbonRow.close();
+    } catch (jthrowable ex) {
+        env->ExceptionDescribe();
+        env->ExceptionClear();
+    }
+}
+
+void writeData(JNIEnv *env, CarbonWriter writer, int size, jclass objClass, char *stringField, short shortField) {
+    jobjectArray arr = env->NewObjectArray(size, objClass, 0);
+
+    jobject jStringField = env->NewStringUTF(stringField);
+    env->SetObjectArrayElement(arr, 0, jStringField);
+
+    char ctrShort[10];
+    gcvt(shortField % 10000, 10, ctrShort);
+    jobject jShortField = env->NewStringUTF(ctrShort);
+    env->SetObjectArrayElement(arr, 1, jShortField);
+
+    writer.write(arr);
+
+    env->DeleteLocalRef(jStringField);
+    env->DeleteLocalRef(jShortField);
+    env->DeleteLocalRef(arr);
+}
+
+/**
+  * test WithTableProperties interface
+  *
+  * @param env  jni env
+  * @param path file path
+  * @param argc argument counter
+  * @param argv argument vector
+  * @return true or throw exception
+  */
+bool testWithTableProperty(JNIEnv *env, char *path, int argc, char **argv) {
+
+    char *jsonSchema = "[{stringField:string},{shortField:short}]";
+    try {
+        CarbonWriter writer;
+        writer.builder(env);
+        writer.outputPath(path);
+        writer.withCsvInput(jsonSchema);
+        writer.withTableProperty("sort_columns", "shortField");
+        writer.writtenBy("CSDK");
+        if (argc > 3) {
+            writer.withHadoopConf("fs.s3a.access.key", argv[1]);
+            writer.withHadoopConf("fs.s3a.secret.key", argv[2]);
+            writer.withHadoopConf("fs.s3a.endpoint", argv[3]);
+        }
+        writer.build();
+
+        int size = 10;
+        jclass objClass = env->FindClass("java/lang/String");
+
+        writeData(env, writer, size, objClass, "name3", 22);
+        writeData(env, writer, size, objClass, "name1", 11);
+        writeData(env, writer, size, objClass, "name2", 33);
+        writer.close();
+
+        CarbonReader carbonReader;
+        carbonReader.builder(env, path);
+        carbonReader.build();
+        int i = 0;
+        CarbonRow carbonRow(env);
+        while (carbonReader.hasNext()) {
+            jobject row = carbonReader.readNextRow();
+            i++;
+            carbonRow.setCarbonRow(row);
+            printf("%d\t%s\t\n", carbonRow.getShort(0), carbonRow.getString(1));
+            env->DeleteLocalRef(row);
+        }
+        carbonReader.close();
+    } catch (jthrowable ex) {
+        env->ExceptionDescribe();
+        env->ExceptionClear();
+    }
+}
+
+/**
+  * test sortBy interface
+  *
+  * @param env  jni env
+  * @param path file path
+  * @param argc argument counter
+  * @param argv argument vector
+  * @return true or throw exception
+  */
+bool testSortBy(JNIEnv *env, char *path, int argc, char **argv) {
+
+    char *jsonSchema = "[{stringField:string},{shortField:short}]";
+    char *sort[1];
+    sort[0] = "shortField";
+    try {
+        CarbonWriter writer;
+        writer.builder(env);
+        writer.outputPath(path);
+        writer.withCsvInput(jsonSchema);
+        writer.sortBy(1, sort);
+        writer.writtenBy("CSDK");
+        if (argc > 3) {
+            writer.withHadoopConf("fs.s3a.access.key", argv[1]);
+            writer.withHadoopConf("fs.s3a.secret.key", argv[2]);
+            writer.withHadoopConf("fs.s3a.endpoint", argv[3]);
+        }
+        writer.build();
+
+        int size = 10;
+        jclass objClass = env->FindClass("java/lang/String");
+
+        writeData(env, writer, size, objClass, "name3", 22);
+        writeData(env, writer, size, objClass, "name1", 11);
+        writeData(env, writer, size, objClass, "name2", 33);
+        writer.close();
+
+        CarbonReader carbonReader;
+        carbonReader.builder(env, path);
+        carbonReader.build();
+        int i = 0;
+        CarbonRow carbonRow(env);
+        while (carbonReader.hasNext()) {
+            jobject row = carbonReader.readNextRow();
+            i++;
+            carbonRow.setCarbonRow(row);
+            printf("%d\t%s\t\n", carbonRow.getShort(0), carbonRow.getString(1));
+            env->DeleteLocalRef(row);
+        }
+        carbonReader.close();
     } catch (jthrowable ex) {
         env->ExceptionDescribe();
         env->ExceptionClear();
@@ -688,24 +833,29 @@ int main(int argc, char *argv[]) {
         // TODO: need support read schema from S3 in the future
         testWriteData(env, S3WritePath, 4, argv);
         readFromS3(env, S3ReadPath, argv);
+        testWithTableProperty(env, "s3a://csdk/dataProperty", 4, argv);
+        testSortBy(env, "s3a://csdk/dataSort", 4, argv);
 
         testReadNextRow(env, S3Path, 100000, argv, 4, false);
         testReadNextRow(env, S3Path, 100000, argv, 4, true);
         testReadNextBatchRow(env, S3Path, 100000, 100000, argv, 4, false);
         testReadNextBatchRow(env, S3Path, 100000, 100000, argv, 4, true);
     } else {
+        int batch = 32000;
+        int printNum = 32000;
+
         tryCatchException(env);
         tryCarbonRowException(env, smallFilePath);
         testCarbonProperties(env);
         testWriteData(env, "./data", 1, argv);
-        testWriteData(env, "./data", 1, argv);
+        testWriteData(env, "./dataLoadOption", 1, argv);
         readFromLocalWithoutProjection(env, smallFilePath);
         readFromLocalWithProjection(env, smallFilePath);
         readSchema(env, path, false);
         readSchema(env, path, true);
+        testWithTableProperty(env, "./dataProperty", 1, argv);
+        testSortBy(env, "./dataSort", 1, argv);
 
-        int batch = 32000;
-        int printNum = 32000;
         testReadNextRow(env, path, printNum, argv, 0, true);
         testReadNextRow(env, path, printNum, argv, 0, false);
         testReadNextBatchRow(env, path, batch, printNum, argv, 0, true);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 1241504..5f8cdfe 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -77,8 +77,9 @@ public class CarbonWriterBuilder {
 
   /**
    * Sets the output path of the writer builder
+   *
    * @param path is the absolute path where output files are written
-   * This method must be called when building CarbonWriterBuilder
+   *             This method must be called when building CarbonWriterBuilder
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder outputPath(String path) {
@@ -89,9 +90,10 @@ public class CarbonWriterBuilder {
 
   /**
    * sets the list of columns that needs to be in sorted order
+   *
    * @param sortColumns is a string array of columns that needs to be sorted.
-   * If it is null or by default all dimensions are selected for sorting
-   * If it is empty array, no columns are sorted
+   *                    If it is null or by default all dimensions are selected for sorting
+   *                    If it is empty array, no columns are sorted
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder sortBy(String[] sortColumns) {
@@ -124,8 +126,9 @@ public class CarbonWriterBuilder {
   /**
    * sets the taskNo for the writer. SDKs concurrently running
    * will set taskNo in order to avoid conflicts in file's name during write.
+   *
    * @param taskNo is the TaskNo user wants to specify.
-   * by default it is system time in nano seconds.
+   *               by default it is system time in nano seconds.
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder taskNo(long taskNo) {
@@ -135,8 +138,9 @@ public class CarbonWriterBuilder {
 
   /**
    * to set the timestamp in the carbondata and carbonindex index files
+   *
    * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
-   * By default set to zero.
+   *                  By default set to zero.
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder uniqueIdentifier(long timestamp) {
@@ -201,6 +205,22 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * To support the load options for sdk writer
+   *
+   * @param key   the key of load option
+   * @param value the value of load option
+   * @return updated CarbonWriterBuilder object
+   */
+  public CarbonWriterBuilder withLoadOption(String key, String value) {
+    Objects.requireNonNull(key, "key of load properties should not be null");
+    Objects.requireNonNull(key, "value of load properties should not be null");
+    Map map = new HashMap();
+    map.put(key, value);
+    withLoadOptions(map);
+    return this;
+  }
+
+  /**
    * To support the table properties for sdk writer
    *
    * @param options key,value pair of create table properties.
@@ -270,6 +290,22 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * To support the table properties for sdk writer
+   *
+   * @param key   property key
+   * @param value property value
+   * @return CarbonWriterBuilder object
+   */
+  public CarbonWriterBuilder withTableProperty(String key, String value) {
+    Objects.requireNonNull(key, "key of table properties should not be null");
+    Objects.requireNonNull(key, "value of table properties  should not be null");
+    Map map = new HashMap();
+    map.put(key, value);
+    withTableProperties(map);
+    return this;
+  }
+
+  /**
    * To make sdk writer thread safe.
    *
    * @param numOfThreads should number of threads in which writer is called in multi-thread scenario
@@ -316,8 +352,9 @@ public class CarbonWriterBuilder {
 
   /**
    * To set the carbondata file size in MB between 1MB-2048MB
+   *
    * @param blockSize is size in MB between 1MB to 2048 MB
-   * default value is 1024 MB
+   *                  default value is 1024 MB
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withBlockSize(int blockSize) {
@@ -329,7 +366,7 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * @param localDictionaryThreshold is localDictionaryThreshold,default is 10000
+   * @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder localDictionaryThreshold(int localDictionaryThreshold) {
@@ -351,7 +388,7 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * @param enableLocalDictionary enable local dictionary  , default is false
+   * @param enableLocalDictionary enable local dictionary, default is false
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder enableLocalDictionary(boolean enableLocalDictionary) {
@@ -362,8 +399,9 @@ public class CarbonWriterBuilder {
 
   /**
    * To set the blocklet size of CarbonData file
+   *
    * @param blockletSize is blocklet size in MB
-   * default value is 64 MB
+   *                     default value is 64 MB
    * @return updated CarbonWriterBuilder
    */
   public CarbonWriterBuilder withBlockletSize(int blockletSize) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 58b9b59..156ca5f 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -543,4 +543,42 @@ public class CSVCarbonWriterTest {
     }
   }
 
+  @Test
+  public void testWithTableProperties() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    try {
+      CarbonWriter writer = CarbonWriter
+          .builder()
+          .taskNo(5)
+          .outputPath(path)
+          .withCsvInput(new Schema(fields))
+          .writtenBy("CSVCarbonWriterTest")
+          .withTableProperty("sort_columns", "name")
+          .build();
+      writer.write(new String[]{"name3", "21"});
+      writer.write(new String[]{"name1", "7"});
+      writer.write(new String[]{"name2", "18"});
+      writer.close();
+
+      CarbonReader reader = CarbonReader.builder(path, "test").build();
+      int i = 0;
+      while (reader.hasNext()) {
+        i++;
+        Object[] row = (Object[]) reader.readNextRow();
+        Assert.assertTrue(("name" + i).equalsIgnoreCase(row[0].toString()));
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    } finally {
+      FileUtils.deleteDirectory(new File(path));
+    }
+  }
+
 }