You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2018/12/19 02:45:31 UTC
carbondata git commit: [CARBONDATA-2999] support read schema from S3
Repository: carbondata
Updated Branches:
refs/heads/master cabafe56d -> 96cc1a6ab
[CARBONDATA-2999] support read schema from S3
This closes #2931
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/96cc1a6a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/96cc1a6a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/96cc1a6a
Branch: refs/heads/master
Commit: 96cc1a6abb8200f4dda76b4b73a6d9df14038a5e
Parents: cabafe5
Author: xubo245 <xu...@huawei.com>
Authored: Tue Nov 20 15:36:43 2018 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Wed Dec 19 10:43:44 2018 +0800
----------------------------------------------------------------------
.../core/datastore/impl/FileFactory.java | 4 +-
docs/csdk-guide.md | 32 +++++-
docs/sdk-guide.md | 30 ++++++
.../carbondata/examples/sdk/SDKS3Example.java | 1 -
.../examples/sdk/SDKS3SchemaReadExample.java | 69 +++++++++++++
store/CSDK/CMakeLists.txt | 2 +-
store/CSDK/src/CarbonSchemaReader.cpp | 22 +++-
store/CSDK/src/CarbonSchemaReader.h | 26 +++++
store/CSDK/src/Configuration.cpp | 101 +++++++++++++++++++
store/CSDK/src/Configuration.h | 85 ++++++++++++++++
store/CSDK/test/main.cpp | 24 +++--
.../carbondata/sdk/file/CarbonSchemaReader.java | 84 +++++++++++----
12 files changed, 444 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index 8bd3c8e..0d828ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -120,9 +120,11 @@ public final class FileFactory {
throws IOException {
return getDataInputStream(path, fileType, bufferSize, getConfiguration());
}
+
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize,
Configuration configuration) throws IOException {
- return getCarbonFile(path).getDataInputStream(path, fileType, bufferSize, configuration);
+ return getCarbonFile(path, configuration)
+ .getDataInputStream(path, fileType, bufferSize, configuration);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/docs/csdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/csdk-guide.md b/docs/csdk-guide.md
index f82c4fb..75b6364 100644
--- a/docs/csdk-guide.md
+++ b/docs/csdk-guide.md
@@ -408,8 +408,38 @@ release the memory and destroy JVM.
jobject readSchema(char *path, bool validateSchema);
```
+```
+ /**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and will not check all files schema
+ *
+ * @param path file/folder path
+ * @param conf configuration support, can set s3a AK,SK,
+ * end point and other conf with this
+ * @return schema
+ */
+ jobject readSchema(char *path, Configuration conf);
+```
+
+```
+ /**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and user can decide whether check all files schema
+ *
+ * @param path carbon data path
+ * @param validateSchema whether check all files schema
+ * @param conf configuration support, can set s3a AK,SK,
+ * end point and other conf with this
+ * @return schema
+ */
+ jobject readSchema(char *path, bool validateSchema, Configuration conf);
+
+```
+
### Schema
-```
+```
/**
* constructor with jni env and carbon schema data
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/docs/sdk-guide.md
----------------------------------------------------------------------
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index b157e51..dc1fe46 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -812,6 +812,36 @@ Find example code at [CarbonReaderExample](https://github.com/apache/carbondata/
```
/**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and will not check all files schema
+ *
+ * @param path file/folder path
+ * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this
+ * @return schema
+ * @throws IOException
+ */
+ public static Schema readSchema(String path, Configuration conf);
+```
+
+```
+ /**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and user can decide whether check all files schema
+ *
+ * @param path file/folder path
+ * @param validateSchema whether check all files schema
+ * @param conf hadoop configuration support, can set s3a AK,SK,
+ * end point and other conf with this
+ * @return schema
+ * @throws IOException
+ */
+ public static Schema readSchema(String path, boolean validateSchema, Configuration conf);
+```
+
+```
+ /**
* This method return the version details in formatted string by reading from carbondata file
* If application name is SDK_1.0.0 and this has written the carbondata file in carbondata 1.6 project version,
* then this API returns the String "SDK_1.0.0 in version: 1.6.0-SNAPSHOT"
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
index eb98798..f9eae9e 100644
--- a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.sdk.file.CarbonReader;
import org.apache.carbondata.sdk.file.CarbonWriter;
-import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
import org.apache.carbondata.sdk.file.Field;
import org.apache.carbondata.sdk.file.Schema;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3SchemaReadExample.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3SchemaReadExample.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3SchemaReadExample.java
new file mode 100644
index 0000000..2f2c768
--- /dev/null
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3SchemaReadExample.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.sdk;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.sdk.file.CarbonSchemaReader;
+import org.apache.carbondata.sdk.file.Field;
+import org.apache.carbondata.sdk.file.Schema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Example for testing carbonSchemaReader on S3
+ */
+public class SDKS3SchemaReadExample {
+ public static void main(String[] args) throws Exception {
+ Logger logger = LogServiceFactory.getLogService(SDKS3SchemaReadExample.class.getName());
+ if (args == null || args.length < 3) {
+ logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>"
+ + "<s3-endpoint> [table-path-on-s3]");
+ System.exit(0);
+ }
+
+ String path = "s3a://sdk/WriterOutput/carbondata2/";
+
+ if (args.length > 3) {
+ path = args[3];
+ }
+
+ Configuration configuration = new Configuration();
+ configuration.set(ACCESS_KEY, args[0]);
+ configuration.set(SECRET_KEY, args[1]);
+ configuration.set(ENDPOINT, args[2]);
+
+ // method 1 to read schema
+ Schema schema = CarbonSchemaReader.readSchema(path, true, configuration);
+ System.out.println("Schema length is " + schema.getFieldsLength());
+ Field[] fields = schema.getFields();
+ for (int i = 0; i < fields.length; i++) {
+ System.out.println(fields[i] + "\t");
+ }
+
+ // method 2 to read schema
+ Schema schema2 = CarbonSchemaReader.readSchema(path, configuration);
+ System.out.println("Schema length is " + schema2.getFieldsLength());
+ Field[] fields2 = schema2.getFields();
+ for (int i = 0; i < fields2.length; i++) {
+ System.out.println(fields2[i] + "\t");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/CSDK/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/store/CSDK/CMakeLists.txt b/store/CSDK/CMakeLists.txt
index d35ebad..e4c506f 100644
--- a/store/CSDK/CMakeLists.txt
+++ b/store/CSDK/CMakeLists.txt
@@ -25,7 +25,7 @@ include_directories(${JNI_INCLUDE_DIRS})
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
set(SOURCE_FILES src/CarbonReader.cpp src/CarbonReader.h test/main.cpp src/CarbonRow.h
src/CarbonRow.cpp src/CarbonWriter.h src/CarbonWriter.cpp src/CarbonSchemaReader.h
- src/CarbonSchemaReader.cpp src/Schema.h src/Schema.cpp src/CarbonProperties.cpp src/CarbonProperties.h)
+ src/CarbonSchemaReader.cpp src/Schema.h src/Schema.cpp src/CarbonProperties.cpp src/CarbonProperties.h src/Configuration.cpp src/Configuration.h)
add_executable(CSDK ${SOURCE_FILES})
get_filename_component(JAVA_JVM_LIBRARY_DIR ${JAVA_JVM_LIBRARY} DIRECTORY)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/CSDK/src/CarbonSchemaReader.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonSchemaReader.cpp b/store/CSDK/src/CarbonSchemaReader.cpp
index 2743bfe..5fcd427 100644
--- a/store/CSDK/src/CarbonSchemaReader.cpp
+++ b/store/CSDK/src/CarbonSchemaReader.cpp
@@ -30,17 +30,23 @@ CarbonSchemaReader::CarbonSchemaReader(JNIEnv *env) {
}
jobject CarbonSchemaReader::readSchema(char *path) {
+ Configuration conf(jniEnv);
+ return readSchema(path, conf);
+}
+
+jobject CarbonSchemaReader::readSchema(char *path, Configuration conf) {
if (path == NULL) {
throw std::runtime_error("path parameter can't be NULL.");
}
jmethodID methodID = jniEnv->GetStaticMethodID(carbonSchemaReaderClass, "readSchema",
- "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
+ "(Ljava/lang/String;Lorg/apache/hadoop/conf/Configuration;)Lorg/apache/carbondata/sdk/file/Schema;");
if (methodID == NULL) {
throw std::runtime_error("Can't find the method in java: readSchema");
}
jstring jPath = jniEnv->NewStringUTF(path);
- jvalue args[1];
+ jvalue args[2];
args[0].l = jPath;
+ args[1].l = conf.getConfigurationObject();
jobject result = jniEnv->CallStaticObjectMethodA(carbonSchemaReaderClass, methodID, args);
if (jniEnv->ExceptionCheck()) {
throw jniEnv->ExceptionOccurred();
@@ -48,22 +54,28 @@ jobject CarbonSchemaReader::readSchema(char *path) {
return result;
}
-jobject CarbonSchemaReader::readSchema(char *path, bool validateSchema) {
+jobject CarbonSchemaReader::readSchema(char *path, bool validateSchema, Configuration conf) {
if (path == NULL) {
throw std::runtime_error("path parameter can't be NULL.");
}
jmethodID methodID = jniEnv->GetStaticMethodID(carbonSchemaReaderClass, "readSchema",
- "(Ljava/lang/String;)Lorg/apache/carbondata/sdk/file/Schema;");
+ "(Ljava/lang/String;ZLorg/apache/hadoop/conf/Configuration;)Lorg/apache/carbondata/sdk/file/Schema;");
if (methodID == NULL) {
throw std::runtime_error("Can't find the method in java: readSchema");
}
jstring jPath = jniEnv->NewStringUTF(path);
- jvalue args[2];
+ jvalue args[3];
args[0].l = jPath;
args[1].z = validateSchema;
+ args[2].l = conf.getConfigurationObject();
jobject result = jniEnv->CallStaticObjectMethodA(carbonSchemaReaderClass, methodID, args);
if (jniEnv->ExceptionCheck()) {
throw jniEnv->ExceptionOccurred();
}
return result;
}
+
+jobject CarbonSchemaReader::readSchema(char *path, bool validateSchema) {
+ Configuration conf(jniEnv);
+ return readSchema(path, validateSchema, conf);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/CSDK/src/CarbonSchemaReader.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/CarbonSchemaReader.h b/store/CSDK/src/CarbonSchemaReader.h
index 2746773..5ea5f9f 100644
--- a/store/CSDK/src/CarbonSchemaReader.h
+++ b/store/CSDK/src/CarbonSchemaReader.h
@@ -16,6 +16,7 @@
*/
#include <jni.h>
+#include "Configuration.h"
class CarbonSchemaReader {
private:
@@ -50,6 +51,31 @@ public:
jobject readSchema(char *path);
/**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and will not check all files schema
+ *
+ * @param path file/folder path
+ * @param conf configuration support, can set s3a AK,SK,
+ * end point and other conf with this
+ * @return schema
+ */
+ jobject readSchema(char *path, Configuration conf);
+
+ /**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and user can decide whether check all files schema
+ *
+ * @param path carbon data path
+ * @param validateSchema whether check all files schema
+ * @param conf configuration support, can set s3a AK,SK,
+ * end point and other conf with this
+ * @return schema
+ */
+ jobject readSchema(char *path, bool validateSchema, Configuration conf);
+
+ /**
* read schema from path,
* path can be folder path, carbonindex file path, and carbondata file path
* and user can decide whether check all files schema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/CSDK/src/Configuration.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/src/Configuration.cpp b/store/CSDK/src/Configuration.cpp
new file mode 100644
index 0000000..0f73fa7
--- /dev/null
+++ b/store/CSDK/src/Configuration.cpp
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdexcept>
+#include "Configuration.h"
+
+Configuration::Configuration(JNIEnv *env) {
+ this->jniEnv = env;
+ configurationClass = env->FindClass("org/apache/hadoop/conf/Configuration");
+ if (configurationClass == NULL) {
+ throw std::runtime_error("Can't find the class in java: org/apache/hadoop/conf/Configuration");
+ }
+
+ initID = jniEnv->GetMethodID(configurationClass, "<init>", "()V");
+ if (initID == NULL) {
+ throw std::runtime_error("Can't find init it in java: org/apache/hadoop/conf/Configuration");
+ }
+ configurationObject = jniEnv->NewObject(configurationClass, initID);
+ if (configurationClass == NULL) {
+ throw std::runtime_error("Can't create object in java: org/apache/hadoop/conf/Configuration");
+ }
+}
+
+void Configuration::check() {
+ if (configurationClass == NULL) {
+ throw std::runtime_error("configurationClass can't be NULL. Please init first.");
+ }
+ if (configurationObject == NULL) {
+ throw std::runtime_error("configurationObject can't be NULL. Please init first.");
+ }
+}
+
+void Configuration::set(char *key, char *value) {
+ if (key == NULL) {
+ throw std::runtime_error("key parameter can't be NULL.");
+ }
+ if (value == NULL) {
+ throw std::runtime_error("value parameter can't be NULL.");
+ }
+ check();
+
+ if (setID == NULL) {
+ setID = jniEnv->GetMethodID(configurationClass, "set",
+ "(Ljava/lang/String;Ljava/lang/String;)V");
+ if (setID == NULL) {
+ throw std::runtime_error("Can't find the method in java: set");
+ }
+ }
+
+ jvalue args[2];
+ args[0].l = jniEnv->NewStringUTF(key);
+ args[1].l = jniEnv->NewStringUTF(value);
+ jniEnv->CallObjectMethodA(configurationObject, setID, args);
+}
+
+
+char *Configuration::get(char *key, char *defaultValue) {
+ if (key == NULL) {
+ throw std::runtime_error("key parameter can't be NULL.");
+ }
+ if (defaultValue == NULL) {
+ throw std::runtime_error("defaultValue parameter can't be NULL.");
+ }
+
+ check();
+
+ if (getID == NULL) {
+ getID = jniEnv->GetMethodID(configurationClass, "get",
+ "(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;");
+ if (getID == NULL) {
+ throw std::runtime_error("Can't find the method in java: get");
+ }
+ }
+
+ jvalue args[2];
+ args[0].l = jniEnv->NewStringUTF(key);
+ args[1].l = jniEnv->NewStringUTF(defaultValue);
+
+ jobject result = jniEnv->CallObjectMethodA(configurationObject, getID, args);
+ char *str = (char *) jniEnv->GetStringUTFChars((jstring) result, JNI_FALSE);
+ jniEnv->DeleteLocalRef(result);
+ return str;
+}
+
+jobject Configuration::getConfigurationObject() {
+ return configurationObject;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/CSDK/src/Configuration.h
----------------------------------------------------------------------
diff --git a/store/CSDK/src/Configuration.h b/store/CSDK/src/Configuration.h
new file mode 100644
index 0000000..4f7b518
--- /dev/null
+++ b/store/CSDK/src/Configuration.h
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <jni.h>
+
+class Configuration {
+private:
+ /**
+ * configuration Class for creating object and get method id
+ */
+ jclass configurationClass = NULL;
+ /**
+ * configuration object for calling method
+ */
+ jobject configurationObject = NULL;
+ /**
+ * init id of Configuration
+ */
+ jmethodID initID = NULL;
+ /**
+ * set id of Configuration
+ */
+ jmethodID setID = NULL;
+ /**
+ * get id of Configuration
+ */
+ jmethodID getID = NULL;
+
+ /**
+ * check configuration class and Object
+ */
+ void check();
+
+public:
+
+ /**
+ * jni env
+ */
+ JNIEnv *jniEnv;
+
+ /**
+ * Constructor and express the configuration
+ *
+ * @param env JNI env
+ */
+ Configuration(JNIEnv *env);
+
+ /**
+ * configure parameter, including ak,sk and endpoint
+ *
+ * @param key key word
+ * @param value value
+ */
+ void set(char *key, char *value);
+
+ /**
+ * get parameter value from configure
+ *
+ * @param key key word
+ * @param defaultValue default value
+ * @return parameter value
+ */
+ char *get(char *key, char *defaultValue);
+
+ /**
+ * get the configuration object
+ *
+ * @return configuration object
+ */
+ jobject getConfigurationObject();
+};
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/CSDK/test/main.cpp
----------------------------------------------------------------------
diff --git a/store/CSDK/test/main.cpp b/store/CSDK/test/main.cpp
index 140420b..76c0e35 100644
--- a/store/CSDK/test/main.cpp
+++ b/store/CSDK/test/main.cpp
@@ -176,15 +176,24 @@ void printResult(JNIEnv *env, CarbonReader reader) {
* @param env jni env
* @return whether it is success
*/
-bool readSchema(JNIEnv *env, char *Path, bool validateSchema) {
- printf("\nread Schema from Index File:\n");
+bool readSchema(JNIEnv *env, char *Path, bool validateSchema, char **argv, int argc) {
try {
+ printf("\nread Schema:\n");
+ Configuration conf(env);
+ if (argc > 3) {
+ conf.set("fs.s3a.access.key", argv[1]);
+ conf.set("fs.s3a.secret.key", argv[2]);
+ conf.set("fs.s3a.endpoint", argv[3]);
+ }
+ printf("%s\n", conf.get("fs.s3a.endpoint", "default"));
+
CarbonSchemaReader carbonSchemaReader(env);
jobject schema;
+
if (validateSchema) {
- schema = carbonSchemaReader.readSchema(Path, validateSchema);
+ schema = carbonSchemaReader.readSchema(Path, validateSchema, conf);
} else {
- schema = carbonSchemaReader.readSchema(Path);
+ schema = carbonSchemaReader.readSchema(Path, conf);
}
Schema carbonSchema(env, schema);
int length = carbonSchema.getFieldsLength();
@@ -830,8 +839,9 @@ int main(int argc, char *argv[]) {
char *S3Path = "s3a://csdk/bigData/i400bs128";
if (argc > 3) {
- // TODO: need support read schema from S3 in the future
testWriteData(env, S3WritePath, 4, argv);
+ readSchema(env, S3WritePath, true, argv,4);
+ readSchema(env, S3WritePath, false, argv, 4);
readFromS3(env, S3ReadPath, argv);
testWithTableProperty(env, "s3a://csdk/dataProperty", 4, argv);
testSortBy(env, "s3a://csdk/dataSort", 4, argv);
@@ -851,10 +861,10 @@ int main(int argc, char *argv[]) {
testWriteData(env, "./dataLoadOption", 1, argv);
readFromLocalWithoutProjection(env, smallFilePath);
readFromLocalWithProjection(env, smallFilePath);
- readSchema(env, path, false);
- readSchema(env, path, true);
testWithTableProperty(env, "./dataProperty", 1, argv);
testSortBy(env, "./dataSort", 1, argv);
+ readSchema(env, path, false, argv, 1);
+ readSchema(env, path, true, argv, 1);
testReadNextRow(env, path, printNum, argv, 0, true);
testReadNextRow(env, path, printNum, argv, 0, false);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/96cc1a6a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index aadd13a..cde609b 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -42,6 +42,8 @@ import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWra
import static org.apache.carbondata.core.util.path.CarbonTablePath.CARBON_DATA_EXT;
import static org.apache.carbondata.core.util.path.CarbonTablePath.INDEX_FILE_EXT;
+import org.apache.hadoop.conf.Configuration;
+
/**
* Schema reader for carbon files, including carbondata file, carbonindex file, and schema file
*/
@@ -68,15 +70,18 @@ public class CarbonSchemaReader {
/**
* get carbondata/carbonindex file in path
*
- * @param path carbon file path
+ * @param path carbon file path
+ * @param extension carbon file extension
+ * @param conf hadoop configuration support, can set s3a AK,SK,
+ * end point and other conf with this
* @return CarbonFile array
*/
- private static CarbonFile[] getCarbonFile(String path, final String extension)
+ private static CarbonFile[] getCarbonFile(String path, final String extension, Configuration conf)
throws IOException {
String dataFilePath = path;
if (!(dataFilePath.endsWith(extension))) {
CarbonFile[] carbonFiles = FileFactory
- .getCarbonFile(path)
+ .getCarbonFile(path, conf)
.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
@@ -106,7 +111,22 @@ public class CarbonSchemaReader {
* @throws IOException
*/
public static Schema readSchema(String path) throws IOException {
- return readSchema(path, false);
+ Configuration conf = new Configuration();
+ return readSchema(path, false, conf);
+ }
+
+ /**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and will not check all files schema
+ *
+ * @param path file/folder path
+ * @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this
+ * @return schema
+ * @throws IOException
+ */
+ public static Schema readSchema(String path, Configuration conf) throws IOException {
+ return readSchema(path, false, conf);
}
/**
@@ -114,30 +134,33 @@ public class CarbonSchemaReader {
* path can be folder path, carbonindex file path, and carbondata file path
* and user can decide whether check all files schema
*
- * @param path file/folder path
+ * @param path file/folder path
* @param validateSchema whether check all files schema
+ * @param conf hadoop configuration support, can set s3a AK,SK,
+ * end point and other conf with this
* @return schema
* @throws IOException
*/
- public static Schema readSchema(String path, boolean validateSchema) throws IOException {
+ public static Schema readSchema(String path, boolean validateSchema, Configuration conf)
+ throws IOException {
if (path.endsWith(INDEX_FILE_EXT)) {
- return readSchemaFromIndexFile(path);
+ return readSchemaFromIndexFile(path, conf);
} else if (path.endsWith(CARBON_DATA_EXT)) {
- return readSchemaFromDataFile(path);
+ return readSchemaFromDataFile(path, conf);
} else if (validateSchema) {
- CarbonFile[] carbonIndexFiles = getCarbonFile(path, INDEX_FILE_EXT);
+ CarbonFile[] carbonIndexFiles = getCarbonFile(path, INDEX_FILE_EXT, conf);
Schema schema;
if (carbonIndexFiles != null && carbonIndexFiles.length != 0) {
- schema = readSchemaFromIndexFile(carbonIndexFiles[0].getAbsolutePath());
+ schema = readSchemaFromIndexFile(carbonIndexFiles[0].getAbsolutePath(), conf);
for (int i = 1; i < carbonIndexFiles.length; i++) {
- Schema schema2 = readSchemaFromIndexFile(carbonIndexFiles[i].getAbsolutePath());
+ Schema schema2 = readSchemaFromIndexFile(carbonIndexFiles[i].getAbsolutePath(), conf);
if (!schema.equals(schema2)) {
throw new CarbonDataLoadingException("Schema is different between different files.");
}
}
- CarbonFile[] carbonDataFiles = getCarbonFile(path, CARBON_DATA_EXT);
+ CarbonFile[] carbonDataFiles = getCarbonFile(path, CARBON_DATA_EXT, conf);
for (int i = 0; i < carbonDataFiles.length; i++) {
- Schema schema2 = readSchemaFromDataFile(carbonDataFiles[i].getAbsolutePath());
+ Schema schema2 = readSchemaFromDataFile(carbonDataFiles[i].getAbsolutePath(), conf);
if (!schema.equals(schema2)) {
throw new CarbonDataLoadingException("Schema is different between different files.");
}
@@ -147,12 +170,27 @@ public class CarbonSchemaReader {
throw new CarbonDataLoadingException("No carbonindex file in this path.");
}
} else {
- String indexFilePath = getCarbonFile(path, INDEX_FILE_EXT)[0].getAbsolutePath();
- return readSchemaFromIndexFile(indexFilePath);
+ String indexFilePath = getCarbonFile(path, INDEX_FILE_EXT, conf)[0].getAbsolutePath();
+ return readSchemaFromIndexFile(indexFilePath, conf);
}
}
/**
+ * read schema from path,
+ * path can be folder path, carbonindex file path, and carbondata file path
+ * and user can decide whether check all files schema
+ *
+ * @param path file/folder path
+ * @param validateSchema whether check all files schema
+ * @return schema
+ * @throws IOException
+ */
+ public static Schema readSchema(String path, boolean validateSchema) throws IOException {
+ Configuration conf = new Configuration();
+ return readSchema(path, validateSchema, conf);
+ }
+
+ /**
* Read carbondata file and return the schema
* This interface will be removed,
* please use readSchema instead of this interface
@@ -170,11 +208,14 @@ public class CarbonSchemaReader {
* Read schema from carbondata file
*
* @param dataFilePath carbondata file path
+ * @param conf hadoop configuration support, can set s3a AK,SK,
+ * end point and other conf with this
* @return carbon data schema
* @throws IOException
*/
- public static Schema readSchemaFromDataFile(String dataFilePath) throws IOException {
- CarbonHeaderReader reader = new CarbonHeaderReader(dataFilePath);
+ private static Schema readSchemaFromDataFile(String dataFilePath, Configuration conf)
+ throws IOException {
+ CarbonHeaderReader reader = new CarbonHeaderReader(dataFilePath, conf);
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
List<ColumnSchema> schemaList = reader.readSchema();
for (int i = 0; i < schemaList.size(); i++) {
@@ -204,18 +245,21 @@ public class CarbonSchemaReader {
* Read schema from carbonindex file
*
* @param indexFilePath carbonindex file path
+ * @param conf hadoop configuration support, can set s3a AK,SK,
+ * end point and other conf with this
* @return carbon data Schema
* @throws IOException
*/
- private static Schema readSchemaFromIndexFile(String indexFilePath) throws IOException {
+ private static Schema readSchemaFromIndexFile(String indexFilePath, Configuration conf)
+ throws IOException {
CarbonFile indexFile =
- FileFactory.getCarbonFile(indexFilePath, FileFactory.getFileType(indexFilePath));
+ FileFactory.getCarbonFile(indexFilePath, conf);
if (!indexFile.getName().endsWith(INDEX_FILE_EXT)) {
throw new IOException("Not an index file name");
}
// read schema from the first index file
DataInputStream dataInputStream =
- FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath));
+ FileFactory.getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath), conf);
byte[] bytes = new byte[(int) indexFile.getSize()];
try {
//get the file in byte buffer