You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/18 04:32:29 UTC
carbondata git commit: [CARBONDATA-3073] Support configure
TableProperties,
withLoadOption etc. interface in carbon writer of C++ SDK support
withTableProperty, withLoadOption, taskNo, uniqueIdentifier, withThreadSafe,
withBlockSize, withBlockletSize, local
Repository: carbondata
Updated Branches:
refs/heads/master 38abb6bd9 -> eb52c37c3
[CARBONDATA-3073] Support configure TableProperties,withLoadOption etc.
interface in carbon writer of C++ SDK support withTableProperty,
withLoadOption,taskNo, uniqueIdentifier, withThreadSafe,withBlockSize,
withBlockletSize, localDictionaryThreshold, enableLocalDictionary, sortBy in C++ SDK
support withTableProperty, withLoadOption,taskNo, uniqueIdentifier, withThreadSafe,
withBlockSize, withBlockletSize, localDictionaryThreshold, enableLocalDictionary,sortBy in C++ SDK
This closes #2899
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb52c37c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb52c37c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb52c37c
Branch: refs/heads/master
Commit: eb52c37c38b474c48e620a5bdd3f1cf9d68ffda6
Parents: 38abb6b
Author: xubo245 <xu...@huawei.com>
Authored: Mon Nov 5 19:43:19 2018 +0800
Committer: kunal642 <ku...@gmail.com>
Committed: Tue Dec 18 10:01:07 2018 +0530
----------------------------------------------------------------------
docs/csdk-guide.md | 125 +++++
.../examples/sdk/CarbonReaderExample.java | 4 +-
.../examples/sdk/SDKS3ReadExample.java | 15 +-
store/CSDK/CMakeLists.txt | 6 +-
store/CSDK/src/CarbonReader.h | 4 +-
store/CSDK/src/CarbonSchemaReader.cpp | 4 +-
store/CSDK/src/CarbonWriter.cpp | 211 +++++++++
store/CSDK/src/CarbonWriter.h | 105 +++++
store/CSDK/src/Schema.h | 5 -
store/CSDK/test/main.cpp | 468 ++++++++++++-------
.../sdk/file/CarbonWriterBuilder.java | 56 ++-
.../sdk/file/CSVCarbonWriterTest.java | 38 ++
12 files changed, 850 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/docs/csdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/csdk-guide.md b/docs/csdk-guide.md
index 6002cf5..f82c4fb 100644
--- a/docs/csdk-guide.md
+++ b/docs/csdk-guide.md
@@ -195,6 +195,18 @@ release the memory and destroy JVM.
```
/**
+ * sets the list of columns that needs to be in sorted order
+ *
+ * @param argc argc argument counter, the number of projection column
+ * @param argv argv is a string array of columns that needs to be sorted.
+ * If it is null or by default all dimensions are selected for sorting
+ * If it is empty array, no columns are sorted
+ */
+ void sortBy(int argc, char *argv[]);
+```
+
+```
+ /**
* configure the schema with json style schema
*
* @param jsonSchema json style schema
@@ -215,6 +227,119 @@ release the memory and destroy JVM.
```
```
+ /**
+ * To support the table properties for writer
+ *
+ * @param key properties key
+ * @param value properties value
+ */
+ void withTableProperty(char *key, char *value);
+```
+
+```
+ /**
+ * To support the load options for C++ sdk writer
+ *
+ * @param options key,value pair of load options.
+ * supported keys values are
+ * a. bad_records_logger_enable -- true (write into separate logs), false
+ * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+ * c. bad_record_path -- path
+ * d. dateformat -- same as JAVA SimpleDateFormat
+ * e. timestampformat -- same as JAVA SimpleDateFormat
+ * f. complex_delimiter_level_1 -- value to Split the complexTypeData
+ * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
+ * h. quotechar
+ * i. escapechar
+ *
+ * Default values are as follows.
+ *
+ * a. bad_records_logger_enable -- "false"
+ * b. bad_records_action -- "FAIL"
+ * c. bad_record_path -- ""
+ * d. dateformat -- "" , uses from carbon.properties file
+ * e. timestampformat -- "", uses from carbon.properties file
+ * f. complex_delimiter_level_1 -- "$"
+ * g. complex_delimiter_level_2 -- ":"
+ * h. quotechar -- "\""
+ * i. escapechar -- "\\"
+ *
+ * @return updated CarbonWriterBuilder
+ */
+ void withLoadOption(char *key, char *value);
+```
+
+```
+ /**
+ * sets the taskNo for the writer. CSDKs concurrently running
+ * will set taskNo in order to avoid conflicts in file's name during write.
+ *
+ * @param taskNo is the TaskNo user wants to specify.
+ * by default it is system time in nano seconds.
+ */
+ void taskNo(long taskNo);
+```
+
+```
+ /**
+ * to set the timestamp in the carbondata and carbonindex index files
+ *
+ * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
+ * By default set to zero.
+ * @return updated CarbonWriterBuilder
+ */
+ void uniqueIdentifier(long timestamp);
+```
+
+```
+ /**
+ * To make c++ sdk writer thread safe.
+ *
+ * @param numOfThreads should number of threads in which writer is called in multi-thread scenario
+ * default C++ sdk writer is not thread safe.
+ * can use one writer instance in one thread only.
+ */
+ void withThreadSafe(short numOfThreads) ;
+```
+
+```
+ /**
+ * To set the carbondata file size in MB between 1MB-2048MB
+ *
+ * @param blockSize is size in MB between 1MB to 2048 MB
+ * default value is 1024 MB
+ */
+ void withBlockSize(int blockSize);
+```
+
+```
+ /**
+ * To set the blocklet size of CarbonData file
+ *
+ * @param blockletSize is blocklet size in MB
+ * default value is 64 MB
+ * @return updated CarbonWriterBuilder
+ */
+ void withBlockletSize(int blockletSize);
+```
+
+```
+ /**
+ * @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
+ * @return updated CarbonWriterBuilder
+ */
+ void localDictionaryThreshold(int localDictionaryThreshold);
+```
+
+```
+ /**
+ * @param enableLocalDictionary enable local dictionary, default is false
+ * @return updated CarbonWriterBuilder
+ */
+ void enableLocalDictionary(bool enableLocalDictionary);
+```
+
+```
/**
* @param appName appName which is writing the carbondata files
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
index c730070..dab86e6 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/CarbonReaderExample.java
@@ -56,11 +56,9 @@ public class CarbonReaderExample {
fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
fields[9] = new Field("varcharField", DataTypes.VARCHAR);
fields[10] = new Field("arrayField", DataTypes.createArrayType(DataTypes.STRING));
- Map<String, String> map = new HashMap<>();
- map.put("complex_delimiter_level_1", "#");
CarbonWriter writer = CarbonWriter.builder()
.outputPath(path)
- .withLoadOptions(map)
+ .withLoadOption("complex_delimiter_level_1", "#")
.withCsvInput(new Schema(fields))
.writtenBy("CarbonReaderExample")
.build();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
index 2462d8d..94e4c8d 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3ReadExample.java
@@ -43,26 +43,23 @@ public class SDKS3ReadExample {
System.exit(0);
}
- String path = "s3a://sdk/WriterOutput";
+ String path = "s3a://sdk/WriterOutput/carbondata5";
if (args.length > 3) {
path=args[3];
}
// Read data
-
EqualToExpression equalToExpression = new EqualToExpression(
new ColumnExpression("name", DataTypes.STRING),
new LiteralExpression("robot1", DataTypes.STRING));
- Configuration configuration = new Configuration();
- configuration.set(ACCESS_KEY, args[0]);
- configuration.set(SECRET_KEY, args[1]);
- configuration.set(ENDPOINT, args[2]);
CarbonReader reader = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
.filter(equalToExpression)
- .withHadoopConf(configuration)
+ .withHadoopConf(ACCESS_KEY, args[0])
+ .withHadoopConf(SECRET_KEY, args[1])
+ .withHadoopConf(ENDPOINT, args[2])
.build();
System.out.println("\nData:");
@@ -79,7 +76,9 @@ public class SDKS3ReadExample {
CarbonReader reader2 = CarbonReader
.builder(path, "_temp")
.projection(new String[]{"name", "age"})
- .withHadoopConf(configuration)
+ .withHadoopConf(ACCESS_KEY, args[0])
+ .withHadoopConf(SECRET_KEY, args[1])
+ .withHadoopConf(ENDPOINT, args[2])
.build();
System.out.println("\nData:");
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/store/CSDK/CMakeLists.txt b/store/CSDK/CMakeLists.txt
index 137e6fd..d35ebad 100644
--- a/store/CSDK/CMakeLists.txt
+++ b/store/CSDK/CMakeLists.txt
@@ -14,7 +14,7 @@
# limitations under the License.
cmake_minimum_required(VERSION 2.8)
-project(CJDK)
+project(CSDK)
set(CMAKE_BUILD_TYPE Debug)
SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH true)
@@ -27,8 +27,8 @@ set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/Carbo
src/CarbonRow.cpp src/CarbonWriter.h src/CarbonWriter.cpp src/CarbonSchemaReader.h
src/CarbonSchemaReader.cpp src/Schema.h src/Schema.cpp src/CarbonProperties.cpp src/CarbonProperties.h)
-add_executable(CJDK ${SOURCE_FILES})
+add_executable(CSDK ${SOURCE_FILES})
get_filename_component(JAVA_JVM_LIBRARY_DIR ${JAVA_JVM_LIBRARY} DIRECTORY)
message(${JAVA_JVM_LIBRARY_DIR})
-target_link_libraries(CJDK ${JAVA_JVM_LIBRARY})
+target_link_libraries(CSDK ${JAVA_JVM_LIBRARY})
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonReader.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonReader.h b/store/CSDK/src/CarbonReader.h
index 0b4a82e..50cc851 100644
--- a/store/CSDK/src/CarbonReader.h
+++ b/store/CSDK/src/CarbonReader.h
@@ -94,8 +94,8 @@ public:
/**
* Configure the projection column names of carbon reader
*
- * @param argc argument counter
- * @param argv argument vector
+ * @param argc argument counter, the number of projection column
+ * @param argv argument vector, projection column names
*/
void projection(int argc, char *argv[]);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonSchemaReader.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonSchemaReader.cpp b/store/CSDK/src/CarbonSchemaReader.cpp
index c895f4b..2743bfe 100644
--- a/store/CSDK/src/CarbonSchemaReader.cpp
+++ b/store/CSDK/src/CarbonSchemaReader.cpp
@@ -34,7 +34,7 @@ jobject CarbonSchemaReader::readSchema(char *path) {
throw std::runtime_error("path parameter can't be NULL.");
}
jmethodID methodID = jniEnv->GetStaticMethodID(carbonSchemaReaderClass, "readSchema",
- "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
+ "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
if (methodID == NULL) {
throw std::runtime_error("Can't find the method in java: readSchema");
}
@@ -53,7 +53,7 @@ jobject CarbonSchemaReader::readSchema(char *path, bool validateSchema) {
throw std::runtime_error("path parameter can't be NULL.");
}
jmethodID methodID = jniEnv->GetStaticMethodID(carbonSchemaReaderClass, "readSchema",
- "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
+ "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
if (methodID == NULL) {
throw std::runtime_error("Can't find the method in java: readSchema");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonWriter.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonWriter.cpp b/store/CSDK/src/CarbonWriter.cpp
index 2a4bcc7..6ce5d7d 100644
--- a/store/CSDK/src/CarbonWriter.cpp
+++ b/store/CSDK/src/CarbonWriter.cpp
@@ -58,6 +58,38 @@ void CarbonWriter::outputPath(char *path) {
carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
}
+void CarbonWriter::sortBy(int argc, char **argv) {
+ if (argc < 0) {
+ throw std::runtime_error("argc parameter can't be negative.");
+ }
+ if (argv == NULL) {
+ throw std::runtime_error("argv parameter can't be NULL.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "sortBy",
+ "([Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: sortBy");
+ }
+ jclass objectArrayClass = jniEnv->FindClass("Ljava/lang/String;");
+ if (objectArrayClass == NULL) {
+ throw std::runtime_error("Can't find the class in java: java/lang/String");
+ }
+ jobjectArray array = jniEnv->NewObjectArray(argc, objectArrayClass, NULL);
+ for (int i = 0; i < argc; ++i) {
+ jstring value = jniEnv->NewStringUTF(argv[i]);
+ jniEnv->SetObjectArrayElement(array, i, value);
+ }
+
+ jvalue args[1];
+ args[0].l = array;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
void CarbonWriter::withCsvInput(char *jsonSchema) {
if (jsonSchema == NULL) {
throw std::runtime_error("jsonSchema parameter can't be NULL.");
@@ -98,6 +130,185 @@ void CarbonWriter::withHadoopConf(char *key, char *value) {
carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
}
+void CarbonWriter::withTableProperty(char *key, char *value) {
+ if (key == NULL) {
+ throw std::runtime_error("key parameter can't be NULL.");
+ }
+ if (value == NULL) {
+ throw std::runtime_error("value parameter can't be NULL.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withTableProperty",
+ "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: withTableProperty");
+ }
+ jvalue args[2];
+ args[0].l = jniEnv->NewStringUTF(key);
+ args[1].l = jniEnv->NewStringUTF(value);
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::withLoadOption(char *key, char *value) {
+ if (key == NULL) {
+ throw std::runtime_error("key parameter can't be NULL.");
+ }
+ if (value == NULL) {
+ throw std::runtime_error("value parameter can't be NULL.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withLoadOption",
+ "(Ljava/lang/String;Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: withLoadOption");
+ }
+ jvalue args[2];
+ args[0].l = jniEnv->NewStringUTF(key);
+ args[1].l = jniEnv->NewStringUTF(value);
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::taskNo(long taskNo) {
+ if (taskNo < 0) {
+ throw std::runtime_error("taskNo parameter can't be negative.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "taskNo",
+ "(J)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: taskNo");
+ }
+ jvalue args[1];
+ args[0].j = taskNo;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::uniqueIdentifier(long timestamp) {
+ if (timestamp < 1) {
+ throw std::runtime_error("timestamp parameter can't be negative.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "uniqueIdentifier",
+ "(J)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: uniqueIdentifier");
+ }
+ jvalue args[1];
+ args[0].j = timestamp;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::withThreadSafe(short numOfThreads) {
+ if (numOfThreads < 1) {
+ throw std::runtime_error("numOfThreads parameter can't be negative.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withThreadSafe",
+ "(S)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: withThreadSafe");
+ }
+ jvalue args[1];
+ args[0].s = numOfThreads;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::withBlockSize(int blockSize) {
+ if (blockSize < 1) {
+ throw std::runtime_error("blockSize parameter should be positive number.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withBlockSize",
+ "(I)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: withBlockSize");
+ }
+ jvalue args[1];
+ args[0].i = blockSize;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::withBlockletSize(int blockletSize) {
+ if (blockletSize < 1) {
+ throw std::runtime_error("blockletSize parameter should be positive number.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "withBlockletSize",
+ "(I)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: withBlockletSize");
+ }
+ jvalue args[1];
+ args[0].i = blockletSize;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::localDictionaryThreshold(int localDictionaryThreshold) {
+ if (localDictionaryThreshold < 1) {
+ throw std::runtime_error("localDictionaryThreshold parameter should be positive number.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "localDictionaryThreshold",
+ "(I)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: localDictionaryThreshold");
+ }
+ jvalue args[1];
+ args[0].i = localDictionaryThreshold;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
+void CarbonWriter::enableLocalDictionary(bool enableLocalDictionary) {
+ if (enableLocalDictionary == NULL) {
+ throw std::runtime_error("enableLocalDictionary parameter can't be NULL.");
+ }
+ checkBuilder();
+ jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
+ jmethodID methodID = jniEnv->GetMethodID(carbonWriterBuilderClass, "enableLocalDictionary",
+ "(Z)Lorg/apache/carbondata/sdk/file/CarbonWriterBuilder;");
+ if (methodID == NULL) {
+ throw std::runtime_error("Can't find the method in java: enableLocalDictionary");
+ }
+ jvalue args[1];
+ args[0].z = enableLocalDictionary;
+ carbonWriterBuilderObject = jniEnv->CallObjectMethodA(carbonWriterBuilderObject, methodID, args);
+ if (jniEnv->ExceptionCheck()) {
+ throw jniEnv->ExceptionOccurred();
+ }
+}
+
void CarbonWriter::writtenBy(char *appName) {
checkBuilder();
jclass carbonWriterBuilderClass = jniEnv->GetObjectClass(carbonWriterBuilderObject);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/CarbonWriter.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonWriter.h b/store/CSDK/src/CarbonWriter.h
index a822432..fdb0bc6 100644
--- a/store/CSDK/src/CarbonWriter.h
+++ b/store/CSDK/src/CarbonWriter.h
@@ -77,6 +77,16 @@ public:
*/
void outputPath(char *path);
+ /**
+ * sets the list of columns that needs to be in sorted order
+ *
+ * @param argc argc argument counter, the number of projection column
+ * @param argv argv is a string array of columns that needs to be sorted.
+ * If it is null or by default all dimensions are selected for sorting
+ * If it is empty array, no columns are sorted
+ */
+ void sortBy(int argc, char *argv[]);
+
/**
* configure the schema with json style schema
*
@@ -95,6 +105,101 @@ public:
void withHadoopConf(char *key, char *value);
/**
+ * To support the table properties for writer
+ *
+ * @param key properties key
+ * @param value properties value
+ */
+ void withTableProperty(char *key, char *value);
+
+ /**
+ * To support the load options for C++ sdk writer
+ *
+ * @param options key,value pair of load options.
+ * supported keys values are
+ * a. bad_records_logger_enable -- true (write into separate logs), false
+ * b. bad_records_action -- FAIL, FORCE, IGNORE, REDIRECT
+ * c. bad_record_path -- path
+ * d. dateformat -- same as JAVA SimpleDateFormat
+ * e. timestampformat -- same as JAVA SimpleDateFormat
+ * f. complex_delimiter_level_1 -- value to Split the complexTypeData
+ * g. complex_delimiter_level_2 -- value to Split the nested complexTypeData
+ * h. quotechar
+ * i. escapechar
+ *
+ * Default values are as follows.
+ *
+ * a. bad_records_logger_enable -- "false"
+ * b. bad_records_action -- "FAIL"
+ * c. bad_record_path -- ""
+ * d. dateformat -- "" , uses from carbon.properties file
+ * e. timestampformat -- "", uses from carbon.properties file
+ * f. complex_delimiter_level_1 -- "$"
+ * g. complex_delimiter_level_2 -- ":"
+ * h. quotechar -- "\""
+ * i. escapechar -- "\\"
+ *
+ * @return updated CarbonWriterBuilder
+ */
+ void withLoadOption(char *key, char *value);
+
+ /**
+ * sets the taskNo for the writer. CSDKs concurrently running
+ * will set taskNo in order to avoid conflicts in file's name during write.
+ *
+ * @param taskNo is the TaskNo user wants to specify.
+ * by default it is system time in nano seconds.
+ */
+ void taskNo(long taskNo);
+
+ /**
+ * to set the timestamp in the carbondata and carbonindex index files
+ *
+ * @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
+ * By default set to zero.
+ * @return updated CarbonWriterBuilder
+ */
+ void uniqueIdentifier(long timestamp);
+
+ /**
+ * To make c++ sdk writer thread safe.
+ *
+ * @param numOfThreads should number of threads in which writer is called in multi-thread scenario
+ * default C++ sdk writer is not thread safe.
+ * can use one writer instance in one thread only.
+ */
+ void withThreadSafe(short numOfThreads) ;
+
+ /**
+ * To set the carbondata file size in MB between 1MB-2048MB
+ *
+ * @param blockSize is size in MB between 1MB to 2048 MB
+ * default value is 1024 MB
+ */
+ void withBlockSize(int blockSize);
+
+ /**
+ * To set the blocklet size of CarbonData file
+ *
+ * @param blockletSize is blocklet size in MB
+ * default value is 64 MB
+ * @return updated CarbonWriterBuilder
+ */
+ void withBlockletSize(int blockletSize);
+
+ /**
+ * @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
+ * @return updated CarbonWriterBuilder
+ */
+ void localDictionaryThreshold(int localDictionaryThreshold);
+
+ /**
+ * @param enableLocalDictionary enable local dictionary, default is false
+ * @return updated CarbonWriterBuilder
+ */
+ void enableLocalDictionary(bool enableLocalDictionary);
+
+ /**
* @param appName appName which is writing the carbondata files
*/
void writtenBy(char *appName);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/src/Schema.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/Schema.h b/store/CSDK/src/Schema.h
index 67434ef..1306f46 100644
--- a/store/CSDK/src/Schema.h
+++ b/store/CSDK/src/Schema.h
@@ -17,11 +17,6 @@
#include <jni.h>
-#ifndef CJDK_SCHEMA_H
-#define CJDK_SCHEMA_H
-
-#endif //CJDK_SCHEMA_H
-
class Schema {
private:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/CSDK/test/main.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp
index 5ab3976..140420b 100644
--- a/store/CSDK/test/main.cpp
+++ b/store/CSDK/test/main.cpp
@@ -178,9 +178,9 @@ void printResult(JNIEnv *env, CarbonReader reader) {
*/
bool readSchema(JNIEnv *env, char *Path, bool validateSchema) {
printf("\nread Schema from Index File:\n");
- CarbonSchemaReader carbonSchemaReader(env);
- jobject schema;
try {
+ CarbonSchemaReader carbonSchemaReader(env);
+ jobject schema;
if (validateSchema) {
schema = carbonSchemaReader.readSchema(Path, validateSchema);
} else {
@@ -247,6 +247,7 @@ bool readFromLocalWithoutProjection(JNIEnv *env, char *path) {
carbonReaderClass.build();
} catch (jthrowable e) {
env->ExceptionDescribe();
+ env->ExceptionClear();
}
printResult(env, carbonReaderClass);
}
@@ -261,10 +262,9 @@ void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int arg
printBoolean(useVectorReader);
printf("\n");
- struct timeval start, build, startRead, endBatchRead, endRead;
- gettimeofday(&start, NULL);
-
try {
+ struct timeval start, build, startRead, endBatchRead, endRead;
+ gettimeofday(&start, NULL);
CarbonReader carbonReaderClass;
carbonReaderClass.builder(env, path);
@@ -323,6 +323,7 @@ void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int arg
carbonRow.close();
} catch (jthrowable) {
env->ExceptionDescribe();
+ env->ExceptionClear();
}
}
@@ -332,98 +333,100 @@ void testReadNextRow(JNIEnv *env, char *path, int printNum, char **argv, int arg
* @param env jni env
*/
void testReadNextBatchRow(JNIEnv *env, char *path, int batchSize, int printNum, char **argv, int argc,
- bool useVectorReader) {
- printf("\n\nTest next Batch Row Performance:\n");
- printBoolean(useVectorReader);
- printf("\n");
+ bool useVectorReader) {
+ try {
+ printf("\n\nTest next Batch Row Performance:\n");
+ printBoolean(useVectorReader);
+ printf("\n");
- struct timeval start, build, read;
- gettimeofday(&start, NULL);
+ struct timeval start, build, read;
+ gettimeofday(&start, NULL);
- CarbonReader carbonReaderClass;
+ CarbonReader carbonReaderClass;
- carbonReaderClass.builder(env, path);
- if (argc > 1) {
- carbonReaderClass.withHadoopConf("fs.s3a.access.key", argv[1]);
- carbonReaderClass.withHadoopConf("fs.s3a.secret.key", argv[2]);
- carbonReaderClass.withHadoopConf("fs.s3a.endpoint", argv[3]);
- }
- if (!useVectorReader) {
- carbonReaderClass.withRowRecordReader();
- }
- carbonReaderClass.withBatch(batchSize);
- try {
+ carbonReaderClass.builder(env, path);
+ if (argc > 1) {
+ carbonReaderClass.withHadoopConf("fs.s3a.access.key", argv[1]);
+ carbonReaderClass.withHadoopConf("fs.s3a.secret.key", argv[2]);
+ carbonReaderClass.withHadoopConf("fs.s3a.endpoint", argv[3]);
+ }
+ if (!useVectorReader) {
+ carbonReaderClass.withRowRecordReader();
+ }
+ carbonReaderClass.withBatch(batchSize);
carbonReaderClass.build();
- } catch (jthrowable e) {
- env->ExceptionDescribe();
- }
-
- gettimeofday(&build, NULL);
- int time = 1000000 * (build.tv_sec - start.tv_sec) + build.tv_usec - start.tv_usec;
- double buildTime = time / 1000000.0;
- printf("\n\nbuild time is: %lf s\n\n", time / 1000000.0);
- CarbonRow carbonRow(env);
- int i = 0;
- struct timeval startHasNext, startReadNextBatchRow, endReadNextBatchRow, endRead;
- gettimeofday(&startHasNext, NULL);
+ gettimeofday(&build, NULL);
+ int time = 1000000 * (build.tv_sec - start.tv_sec) + build.tv_usec - start.tv_usec;
+ double buildTime = time / 1000000.0;
+ printf("\n\nbuild time is: %lf s\n\n", time / 1000000.0);
- while (carbonReaderClass.hasNext()) {
+ CarbonRow carbonRow(env);
+ int i = 0;
+ struct timeval startHasNext, startReadNextBatchRow, endReadNextBatchRow, endRead;
+ gettimeofday(&startHasNext, NULL);
- gettimeofday(&startReadNextBatchRow, NULL);
- jobjectArray batch = carbonReaderClass.readNextBatchRow();
- if (env->ExceptionCheck()) {
- env->ExceptionDescribe();
- }
- gettimeofday(&endReadNextBatchRow, NULL);
+ while (carbonReaderClass.hasNext()) {
- jsize length = env->GetArrayLength(batch);
- if (i + length > printNum - 1) {
- for (int j = 0; j < length; j++) {
- i++;
- jobject row = env->GetObjectArrayElement(batch, j);
- carbonRow.setCarbonRow(row);
- carbonRow.getString(0);
- carbonRow.getString(1);
- carbonRow.getString(2);
- carbonRow.getString(3);
- carbonRow.getLong(4);
- carbonRow.getLong(5);
- if (i > 1 && i % printNum == 0) {
- gettimeofday(&read, NULL);
-
- double hasNextTime = 1000000 * (startReadNextBatchRow.tv_sec - startHasNext.tv_sec) +
- startReadNextBatchRow.tv_usec - startHasNext.tv_usec;
-
- double readNextBatchTime = 1000000 * (endReadNextBatchRow.tv_sec - startReadNextBatchRow.tv_sec) +
- endReadNextBatchRow.tv_usec - startReadNextBatchRow.tv_usec;
-
- time = 1000000 * (read.tv_sec - startHasNext.tv_sec) + read.tv_usec - startHasNext.tv_usec;
- printf("%d: time is %lf s, speed is %lf records/s, hasNext time is %lf s,readNextBatchRow time is %lf s ",
- i, time / 1000000.0, printNum / (time / 1000000.0), hasNextTime / 1000000.0,
- readNextBatchTime / 1000000.0);
- gettimeofday(&startHasNext, NULL);
- printf("%s\t", carbonRow.getString(0));
- printf("%s\t", carbonRow.getString(1));
- printf("%s\t", carbonRow.getString(2));
- printf("%s\t", carbonRow.getString(3));
- printf("%ld\t", carbonRow.getLong(4));
- printf("%ld\t", carbonRow.getLong(5));
- printf("\n");
+ gettimeofday(&startReadNextBatchRow, NULL);
+ jobjectArray batch = carbonReaderClass.readNextBatchRow();
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ }
+ gettimeofday(&endReadNextBatchRow, NULL);
+
+ jsize length = env->GetArrayLength(batch);
+ if (i + length > printNum - 1) {
+ for (int j = 0; j < length; j++) {
+ i++;
+ jobject row = env->GetObjectArrayElement(batch, j);
+ carbonRow.setCarbonRow(row);
+ carbonRow.getString(0);
+ carbonRow.getString(1);
+ carbonRow.getString(2);
+ carbonRow.getString(3);
+ carbonRow.getLong(4);
+ carbonRow.getLong(5);
+ if (i > 1 && i % printNum == 0) {
+ gettimeofday(&read, NULL);
+
+ double hasNextTime = 1000000 * (startReadNextBatchRow.tv_sec - startHasNext.tv_sec) +
+ startReadNextBatchRow.tv_usec - startHasNext.tv_usec;
+
+ double readNextBatchTime =
+ 1000000 * (endReadNextBatchRow.tv_sec - startReadNextBatchRow.tv_sec) +
+ endReadNextBatchRow.tv_usec - startReadNextBatchRow.tv_usec;
+
+ time = 1000000 * (read.tv_sec - startHasNext.tv_sec) + read.tv_usec - startHasNext.tv_usec;
+ printf("%d: time is %lf s, speed is %lf records/s, hasNext time is %lf s,readNextBatchRow time is %lf s ",
+ i, time / 1000000.0, printNum / (time / 1000000.0), hasNextTime / 1000000.0,
+ readNextBatchTime / 1000000.0);
+ gettimeofday(&startHasNext, NULL);
+ printf("%s\t", carbonRow.getString(0));
+ printf("%s\t", carbonRow.getString(1));
+ printf("%s\t", carbonRow.getString(2));
+ printf("%s\t", carbonRow.getString(3));
+ printf("%ld\t", carbonRow.getLong(4));
+ printf("%ld\t", carbonRow.getLong(5));
+ printf("\n");
+ }
+ env->DeleteLocalRef(row);
}
- env->DeleteLocalRef(row);
+ } else {
+ i = i + length;
}
- } else {
- i = i + length;
+ env->DeleteLocalRef(batch);
}
- env->DeleteLocalRef(batch);
+ gettimeofday(&endRead, NULL);
+ time = 1000000 * (endRead.tv_sec - build.tv_sec) + endRead.tv_usec - build.tv_usec;
+ printf("total line is: %d,\t build time is: %lf s,\tread time is %lf s, average speed is %lf records/s ",
+ i, buildTime, time / 1000000.0, i / (time / 1000000.0));
+ carbonReaderClass.close();
+ carbonRow.close();
+ } catch (jthrowable e) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
}
- gettimeofday(&endRead, NULL);
- time = 1000000 * (endRead.tv_sec - build.tv_sec) + endRead.tv_usec - build.tv_usec;
- printf("total line is: %d,\t build time is: %lf s,\tread time is %lf s, average speed is %lf records/s ",
- i, buildTime, time / 1000000.0, i / (time / 1000000.0));
- carbonReaderClass.close();
- carbonRow.close();
}
/**
@@ -434,54 +437,54 @@ void testReadNextBatchRow(JNIEnv *env, char *path, int batchSize, int printNum,
*/
bool readFromLocalWithProjection(JNIEnv *env, char *path) {
printf("\nRead data from local:\n");
+ try {
+ CarbonReader reader;
+ reader.builder(env, path, "test");
+
+ char *argv[12];
+ argv[0] = "stringField";
+ argv[1] = "shortField";
+ argv[2] = "intField";
+ argv[3] = "longField";
+ argv[4] = "doubleField";
+ argv[5] = "boolField";
+ argv[6] = "dateField";
+ argv[7] = "timeField";
+ argv[8] = "decimalField";
+ argv[9] = "varcharField";
+ argv[10] = "arrayField";
+ argv[11] = "floatField";
+ reader.projection(12, argv);
- CarbonReader reader;
- reader.builder(env, path, "test");
+ reader.build();
- char *argv[12];
- argv[0] = "stringField";
- argv[1] = "shortField";
- argv[2] = "intField";
- argv[3] = "longField";
- argv[4] = "doubleField";
- argv[5] = "boolField";
- argv[6] = "dateField";
- argv[7] = "timeField";
- argv[8] = "decimalField";
- argv[9] = "varcharField";
- argv[10] = "arrayField";
- argv[11] = "floatField";
- reader.projection(12, argv);
+ CarbonRow carbonRow(env);
+ while (reader.hasNext()) {
+ jobject row = reader.readNextRow();
+ carbonRow.setCarbonRow(row);
- try {
- reader.build();
+ printf("%s\t", carbonRow.getString(0));
+ printf("%d\t", carbonRow.getShort(1));
+ printf("%d\t", carbonRow.getInt(2));
+ printf("%ld\t", carbonRow.getLong(3));
+ printf("%lf\t", carbonRow.getDouble(4));
+ printBoolean(carbonRow.getBoolean(5));
+ printf("%d\t", carbonRow.getInt(6));
+ printf("%ld\t", carbonRow.getLong(7));
+ printf("%s\t", carbonRow.getDecimal(8));
+ printf("%s\t", carbonRow.getVarchar(9));
+ printArray(env, carbonRow.getArray(10));
+ printf("%f\t", carbonRow.getFloat(11));
+ printf("\n");
+ env->DeleteLocalRef(row);
+ }
+
+ reader.close();
+ carbonRow.close();
} catch (jthrowable e) {
env->ExceptionDescribe();
+ env->ExceptionClear();
}
-
- CarbonRow carbonRow(env);
- while (reader.hasNext()) {
- jobject row = reader.readNextRow();
- carbonRow.setCarbonRow(row);
-
- printf("%s\t", carbonRow.getString(0));
- printf("%d\t", carbonRow.getShort(1));
- printf("%d\t", carbonRow.getInt(2));
- printf("%ld\t", carbonRow.getLong(3));
- printf("%lf\t", carbonRow.getDouble(4));
- printBoolean(carbonRow.getBoolean(5));
- printf("%d\t", carbonRow.getInt(6));
- printf("%ld\t", carbonRow.getLong(7));
- printf("%s\t", carbonRow.getDecimal(8));
- printf("%s\t", carbonRow.getVarchar(9));
- printArray(env, carbonRow.getArray(10));
- printf("%f\t", carbonRow.getFloat(11));
- printf("\n");
- env->DeleteLocalRef(row);
- }
-
- reader.close();
- carbonRow.close();
}
@@ -500,17 +503,23 @@ bool tryCatchException(JNIEnv *env) {
}
void testCarbonProperties(JNIEnv *env) {
- printf("%s", "test Carbon Properties:");
- CarbonProperties carbonProperties(env);
- char *key = "carbon.unsafe.working.memory.in.mb";
- printf("%s\t", carbonProperties.getProperty(key));
- printf("%s\t", carbonProperties.getProperty(key, "512"));
- carbonProperties.addProperty(key, "1024");
- printf("%s\t", carbonProperties.getProperty(key));
+ try {
+ printf("%s", "test Carbon Properties:");
+ CarbonProperties carbonProperties(env);
+ char *key = "carbon.unsafe.working.memory.in.mb";
+ printf("%s\t", carbonProperties.getProperty(key));
+ printf("%s\t", carbonProperties.getProperty(key, "512"));
+ carbonProperties.addProperty(key, "1024");
+ printf("%s\t", carbonProperties.getProperty(key));
+ } catch (jthrowable e) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ }
}
/**
- * test write data to local disk
+ * test write data
+ * test WithLoadOption interface
*
* @param env jni env
* @param path file path
@@ -526,7 +535,15 @@ bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) {
writer.builder(env);
writer.outputPath(path);
writer.withCsvInput(jsonSchema);
+ writer.withLoadOption("complex_delimiter_level_1", "#");
writer.writtenBy("CSDK");
+ writer.taskNo(185);
+ writer.withThreadSafe(1);
+ writer.uniqueIdentifier(1549911814000000);
+ writer.withBlockSize(1);
+ writer.withBlockletSize(16);
+ writer.enableLocalDictionary(true);
+ writer.localDictionaryThreshold(10000);
if (argc > 3) {
writer.withHadoopConf("fs.s3a.access.key", argv[1]);
writer.withHadoopConf("fs.s3a.secret.key", argv[2]);
@@ -534,7 +551,7 @@ bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) {
}
writer.build();
- int rowNum = 10;
+ int rowNum = 70000;
int size = 10;
long longValue = 0;
double doubleValue = 0;
@@ -610,35 +627,163 @@ bool testWriteData(JNIEnv *env, char *path, int argc, char *argv[]) {
carbonReader.builder(env, path);
carbonReader.build();
int i = 0;
+ int printNum = 10;
CarbonRow carbonRow(env);
while (carbonReader.hasNext()) {
jobject row = carbonReader.readNextRow();
i++;
carbonRow.setCarbonRow(row);
- printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2));
- jobjectArray array1 = carbonRow.getArray(3);
- jsize length = env->GetArrayLength(array1);
- int j = 0;
- for (j = 0; j < length; j++) {
- jobject element = env->GetObjectArrayElement(array1, j);
- char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE);
- printf("%s\t", str);
- }
- printf("%d\t", carbonRow.getShort(4));
- printf("%d\t", carbonRow.getInt(5));
- printf("%ld\t", carbonRow.getLong(6));
- printf("%lf\t", carbonRow.getDouble(7));
- bool bool1 = carbonRow.getBoolean(8);
- if (bool1) {
- printf("true\t");
- } else {
- printf("false\t");
+ if (i < printNum) {
+ printf("%s\t%d\t%ld\t", carbonRow.getString(0), carbonRow.getInt(1), carbonRow.getLong(2));
+ jobjectArray array1 = carbonRow.getArray(3);
+ jsize length = env->GetArrayLength(array1);
+ int j = 0;
+ for (j = 0; j < length; j++) {
+ jobject element = env->GetObjectArrayElement(array1, j);
+ char *str = (char *) env->GetStringUTFChars((jstring) element, JNI_FALSE);
+ printf("%s\t", str);
+ }
+ printf("%d\t", carbonRow.getShort(4));
+ printf("%d\t", carbonRow.getInt(5));
+ printf("%ld\t", carbonRow.getLong(6));
+ printf("%lf\t", carbonRow.getDouble(7));
+ bool bool1 = carbonRow.getBoolean(8);
+ if (bool1) {
+ printf("true\t");
+ } else {
+ printf("false\t");
+ }
+ printf("%f\t\n", carbonRow.getFloat(9));
}
- printf("%f\t\n", carbonRow.getFloat(9));
env->DeleteLocalRef(row);
}
carbonReader.close();
- carbonRow.close();
+ } catch (jthrowable ex) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ }
+}
+
+void writeData(JNIEnv *env, CarbonWriter writer, int size, jclass objClass, char *stringField, short shortField) {
+ jobjectArray arr = env->NewObjectArray(size, objClass, 0);
+
+ jobject jStringField = env->NewStringUTF(stringField);
+ env->SetObjectArrayElement(arr, 0, jStringField);
+
+ char ctrShort[10];
+ gcvt(shortField % 10000, 10, ctrShort);
+ jobject jShortField = env->NewStringUTF(ctrShort);
+ env->SetObjectArrayElement(arr, 1, jShortField);
+
+ writer.write(arr);
+
+ env->DeleteLocalRef(jStringField);
+ env->DeleteLocalRef(jShortField);
+ env->DeleteLocalRef(arr);
+}
+
+/**
+ * test WithTableProperties interface
+ *
+ * @param env jni env
+ * @param path file path
+ * @param argc argument counter
+ * @param argv argument vector
+ * @return true or throw exception
+ */
+bool testWithTableProperty(JNIEnv *env, char *path, int argc, char **argv) {
+
+ char *jsonSchema = "[{stringField:string},{shortField:short}]";
+ try {
+ CarbonWriter writer;
+ writer.builder(env);
+ writer.outputPath(path);
+ writer.withCsvInput(jsonSchema);
+ writer.withTableProperty("sort_columns", "shortField");
+ writer.writtenBy("CSDK");
+ if (argc > 3) {
+ writer.withHadoopConf("fs.s3a.access.key", argv[1]);
+ writer.withHadoopConf("fs.s3a.secret.key", argv[2]);
+ writer.withHadoopConf("fs.s3a.endpoint", argv[3]);
+ }
+ writer.build();
+
+ int size = 10;
+ jclass objClass = env->FindClass("java/lang/String");
+
+ writeData(env, writer, size, objClass, "name3", 22);
+ writeData(env, writer, size, objClass, "name1", 11);
+ writeData(env, writer, size, objClass, "name2", 33);
+ writer.close();
+
+ CarbonReader carbonReader;
+ carbonReader.builder(env, path);
+ carbonReader.build();
+ int i = 0;
+ CarbonRow carbonRow(env);
+ while (carbonReader.hasNext()) {
+ jobject row = carbonReader.readNextRow();
+ i++;
+ carbonRow.setCarbonRow(row);
+ printf("%d\t%s\t\n", carbonRow.getShort(0), carbonRow.getString(1));
+ env->DeleteLocalRef(row);
+ }
+ carbonReader.close();
+ } catch (jthrowable ex) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ }
+}
+
+/**
+ * test sortBy interface
+ *
+ * @param env jni env
+ * @param path file path
+ * @param argc argument counter
+ * @param argv argument vector
+ * @return true or throw exception
+ */
+bool testSortBy(JNIEnv *env, char *path, int argc, char **argv) {
+
+ char *jsonSchema = "[{stringField:string},{shortField:short}]";
+ char *sort[1];
+ sort[0] = "shortField";
+ try {
+ CarbonWriter writer;
+ writer.builder(env);
+ writer.outputPath(path);
+ writer.withCsvInput(jsonSchema);
+ writer.sortBy(1, sort);
+ writer.writtenBy("CSDK");
+ if (argc > 3) {
+ writer.withHadoopConf("fs.s3a.access.key", argv[1]);
+ writer.withHadoopConf("fs.s3a.secret.key", argv[2]);
+ writer.withHadoopConf("fs.s3a.endpoint", argv[3]);
+ }
+ writer.build();
+
+ int size = 10;
+ jclass objClass = env->FindClass("java/lang/String");
+
+ writeData(env, writer, size, objClass, "name3", 22);
+ writeData(env, writer, size, objClass, "name1", 11);
+ writeData(env, writer, size, objClass, "name2", 33);
+ writer.close();
+
+ CarbonReader carbonReader;
+ carbonReader.builder(env, path);
+ carbonReader.build();
+ int i = 0;
+ CarbonRow carbonRow(env);
+ while (carbonReader.hasNext()) {
+ jobject row = carbonReader.readNextRow();
+ i++;
+ carbonRow.setCarbonRow(row);
+ printf("%d\t%s\t\n", carbonRow.getShort(0), carbonRow.getString(1));
+ env->DeleteLocalRef(row);
+ }
+ carbonReader.close();
} catch (jthrowable ex) {
env->ExceptionDescribe();
env->ExceptionClear();
@@ -688,24 +833,29 @@ int main(int argc, char *argv[]) {
// TODO: need support read schema from S3 in the future
testWriteData(env, S3WritePath, 4, argv);
readFromS3(env, S3ReadPath, argv);
+ testWithTableProperty(env, "s3a://csdk/dataProperty", 4, argv);
+ testSortBy(env, "s3a://csdk/dataSort", 4, argv);
testReadNextRow(env, S3Path, 100000, argv, 4, false);
testReadNextRow(env, S3Path, 100000, argv, 4, true);
testReadNextBatchRow(env, S3Path, 100000, 100000, argv, 4, false);
testReadNextBatchRow(env, S3Path, 100000, 100000, argv, 4, true);
} else {
+ int batch = 32000;
+ int printNum = 32000;
+
tryCatchException(env);
tryCarbonRowException(env, smallFilePath);
testCarbonProperties(env);
testWriteData(env, "./data", 1, argv);
- testWriteData(env, "./data", 1, argv);
+ testWriteData(env, "./dataLoadOption", 1, argv);
readFromLocalWithoutProjection(env, smallFilePath);
readFromLocalWithProjection(env, smallFilePath);
readSchema(env, path, false);
readSchema(env, path, true);
+ testWithTableProperty(env, "./dataProperty", 1, argv);
+ testSortBy(env, "./dataSort", 1, argv);
- int batch = 32000;
- int printNum = 32000;
testReadNextRow(env, path, printNum, argv, 0, true);
testReadNextRow(env, path, printNum, argv, 0, false);
testReadNextBatchRow(env, path, batch, printNum, argv, 0, true);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 1241504..5f8cdfe 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -77,8 +77,9 @@ public class CarbonWriterBuilder {
/**
* Sets the output path of the writer builder
+ *
* @param path is the absolute path where output files are written
- * This method must be called when building CarbonWriterBuilder
+ * This method must be called when building CarbonWriterBuilder
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder outputPath(String path) {
@@ -89,9 +90,10 @@ public class CarbonWriterBuilder {
/**
* sets the list of columns that needs to be in sorted order
+ *
* @param sortColumns is a string array of columns that needs to be sorted.
- * If it is null or by default all dimensions are selected for sorting
- * If it is empty array, no columns are sorted
+ * If it is null or by default all dimensions are selected for sorting
+ * If it is empty array, no columns are sorted
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder sortBy(String[] sortColumns) {
@@ -124,8 +126,9 @@ public class CarbonWriterBuilder {
/**
* sets the taskNo for the writer. SDKs concurrently running
* will set taskNo in order to avoid conflicts in file's name during write.
+ *
* @param taskNo is the TaskNo user wants to specify.
- * by default it is system time in nano seconds.
+ * by default it is system time in nano seconds.
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder taskNo(long taskNo) {
@@ -135,8 +138,9 @@ public class CarbonWriterBuilder {
/**
* to set the timestamp in the carbondata and carbonindex index files
+ *
* @param timestamp is a timestamp to be used in the carbondata and carbonindex index files.
- * By default set to zero.
+ * By default set to zero.
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder uniqueIdentifier(long timestamp) {
@@ -201,6 +205,22 @@ public class CarbonWriterBuilder {
}
/**
+ * To support the load options for sdk writer
+ *
+ * @param key the key of load option
+ * @param value the value of load option
+ * @return updated CarbonWriterBuilder object
+ */
+ public CarbonWriterBuilder withLoadOption(String key, String value) {
+ Objects.requireNonNull(key, "key of load properties should not be null");
+ Objects.requireNonNull(key, "value of load properties should not be null");
+ Map map = new HashMap();
+ map.put(key, value);
+ withLoadOptions(map);
+ return this;
+ }
+
+ /**
* To support the table properties for sdk writer
*
* @param options key,value pair of create table properties.
@@ -270,6 +290,22 @@ public class CarbonWriterBuilder {
}
/**
+ * To support the table properties for sdk writer
+ *
+ * @param key property key
+ * @param value property value
+ * @return CarbonWriterBuilder object
+ */
+ public CarbonWriterBuilder withTableProperty(String key, String value) {
+ Objects.requireNonNull(key, "key of table properties should not be null");
+ Objects.requireNonNull(key, "value of table properties should not be null");
+ Map map = new HashMap();
+ map.put(key, value);
+ withTableProperties(map);
+ return this;
+ }
+
+ /**
* To make sdk writer thread safe.
*
* @param numOfThreads should number of threads in which writer is called in multi-thread scenario
@@ -316,8 +352,9 @@ public class CarbonWriterBuilder {
/**
* To set the carbondata file size in MB between 1MB-2048MB
+ *
* @param blockSize is size in MB between 1MB to 2048 MB
- * default value is 1024 MB
+ * default value is 1024 MB
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withBlockSize(int blockSize) {
@@ -329,7 +366,7 @@ public class CarbonWriterBuilder {
}
/**
- * @param localDictionaryThreshold is localDictionaryThreshold,default is 10000
+ * @param localDictionaryThreshold is localDictionaryThreshold, default is 10000
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder localDictionaryThreshold(int localDictionaryThreshold) {
@@ -351,7 +388,7 @@ public class CarbonWriterBuilder {
}
/**
- * @param enableLocalDictionary enable local dictionary , default is false
+ * @param enableLocalDictionary enable local dictionary, default is false
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder enableLocalDictionary(boolean enableLocalDictionary) {
@@ -362,8 +399,9 @@ public class CarbonWriterBuilder {
/**
* To set the blocklet size of CarbonData file
+ *
* @param blockletSize is blocklet size in MB
- * default value is 64 MB
+ * default value is 64 MB
* @return updated CarbonWriterBuilder
*/
public CarbonWriterBuilder withBlockletSize(int blockletSize) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb52c37c/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index 58b9b59..156ca5f 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -543,4 +543,42 @@ public class CSVCarbonWriterTest {
}
}
+ @Test
+ public void testWithTableProperties() throws IOException {
+ String path = "./testWriteFiles";
+ FileUtils.deleteDirectory(new File(path));
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+
+ try {
+ CarbonWriter writer = CarbonWriter
+ .builder()
+ .taskNo(5)
+ .outputPath(path)
+ .withCsvInput(new Schema(fields))
+ .writtenBy("CSVCarbonWriterTest")
+ .withTableProperty("sort_columns", "name")
+ .build();
+ writer.write(new String[]{"name3", "21"});
+ writer.write(new String[]{"name1", "7"});
+ writer.write(new String[]{"name2", "18"});
+ writer.close();
+
+ CarbonReader reader = CarbonReader.builder(path, "test").build();
+ int i = 0;
+ while (reader.hasNext()) {
+ i++;
+ Object[] row = (Object[]) reader.readNextRow();
+ Assert.assertTrue(("name" + i).equalsIgnoreCase(row[0].toString()));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ FileUtils.deleteDirectory(new File(path));
+ }
+ }
+
}