You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/27 10:16:39 UTC
carbondata git commit: [CARBONDATA-2384] SDK support write/read data
into/from S3
Repository: carbondata
Updated Branches:
refs/heads/master fae457a35 -> 242c08be4
[CARBONDATA-2384] SDK support write/read data into/from S3
User can set his credential in SDK and use SDK to write data into S3 and read data from S3.
This closes #2226
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/242c08be
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/242c08be
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/242c08be
Branch: refs/heads/master
Commit: 242c08be452ada59397ceaa906a568b35825b5f3
Parents: fae457a
Author: xubo245 <60...@qq.com>
Authored: Wed Apr 25 15:09:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Apr 27 18:16:22 2018 +0800
----------------------------------------------------------------------
.../carbondata/examples/sdk/SDKS3Example.java | 103 +++++++++++++++++++
store/sdk/pom.xml | 5 +
.../sdk/file/CarbonReaderBuilder.java | 69 +++++++++++++
.../sdk/file/CarbonWriterBuilder.java | 69 +++++++++++++
4 files changed, 246 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
new file mode 100644
index 0000000..60aa1f8
--- /dev/null
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.sdk;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.*;
+
+/**
+ * Example for testing CarbonWriter on S3
+ */
+public class SDKS3Example {
+ public static void main(String[] args) throws Exception {
+ LogService logger = LogServiceFactory.getLogService(SDKS3Example.class.getName());
+ if (args == null || args.length < 3) {
+ logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>" +
+ "<s3-endpoint> [table-path-on-s3] [persistSchema] [transactionalTable]");
+ System.exit(0);
+ }
+
+ String path = "s3a://sdk/WriterOutput";
+ if (args.length > 3) {
+ path=args[3];
+ }
+
+ int num = 3;
+ if (args.length > 4) {
+ num = Integer.parseInt(args[4]);
+ }
+
+ Boolean persistSchema = true;
+ if (args.length > 5) {
+ if (args[5].equalsIgnoreCase("true")) {
+ persistSchema = true;
+ } else {
+ persistSchema = false;
+ }
+ }
+
+ Boolean transactionalTable = true;
+ if (args.length > 6) {
+ if (args[6].equalsIgnoreCase("true")) {
+ transactionalTable = true;
+ } else {
+ transactionalTable = false;
+ }
+ }
+
+ Field[] fields = new Field[2];
+ fields[0] = new Field("name", DataTypes.STRING);
+ fields[1] = new Field("age", DataTypes.INT);
+ CarbonWriterBuilder builder = CarbonWriter.builder()
+ .withSchema(new Schema(fields))
+ .setAccessKey(args[0])
+ .setSecretKey(args[1])
+ .setEndPoint(args[2])
+ .outputPath(path)
+ .persistSchemaFile(persistSchema)
+ .isTransactionalTable(transactionalTable);
+
+ CarbonWriter writer = builder.buildWriterForCSVInput();
+
+ for (int i = 0; i < num; i++) {
+ writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
+ }
+ writer.close();
+ // Read data
+ CarbonReader reader = CarbonReader
+ .builder(path, "_temp")
+ .projection(new String[]{"name", "age"})
+ .setAccessKey(args[0])
+ .setSecretKey(args[1])
+ .setEndPoint(args[2])
+ .build();
+
+ System.out.println("\nData:");
+ int i = 0;
+ while (i < 20 && reader.hasNext()) {
+ Object[] row = (Object[]) reader.readNextRow();
+ System.out.println(row[0] + " " + row[1]);
+ i++;
+ }
+ System.out.println("\nFinished");
+ // TODO
+ // reader.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index a7869e4..af0d079 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -40,6 +40,11 @@
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 7f00b49..9560ef7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -24,12 +24,14 @@ import java.util.Objects;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.hadoop.CarbonProjection;
import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
@@ -64,6 +66,73 @@ public class CarbonReaderBuilder {
return this;
}
+ /**
+ * Set the access key for S3
+ *
+ * @param key the string of access key for different S3 type,like: fs.s3a.access.key
+ * @param value the value of access key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonReaderBuilder setAccessKey(String key, String value) {
+ FileFactory.getConfiguration().set(key, value);
+ return this;
+ }
+
+ /**
+ * Set the access key for S3.
+ *
+ * @param value the value of access key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonReaderBuilder setAccessKey(String value) {
+ return setAccessKey(Constants.ACCESS_KEY, value);
+ }
+
+ /**
+ * Set the secret key for S3
+ *
+ * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key
+ * @param value the value of secret key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonReaderBuilder setSecretKey(String key, String value) {
+ FileFactory.getConfiguration().set(key, value);
+ return this;
+ }
+
+ /**
+ * Set the secret key for S3
+ *
+ * @param value the value of secret key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonReaderBuilder setSecretKey(String value) {
+ return setSecretKey(Constants.SECRET_KEY, value);
+ }
+
+ /**
+ * Set the endpoint for S3
+ *
+ * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint
+ * @param value the value of endpoint
+ * @return CarbonWriterBuilder
+ */
+ public CarbonReaderBuilder setEndPoint(String key, String value) {
+ FileFactory.getConfiguration().set(key, value);
+ return this;
+ }
+
+ /**
+ * Set the endpoint for S3
+ *
+ * @param value the value of endpoint
+ * @return CarbonWriterBuilder
+ */
+ public CarbonReaderBuilder setEndPoint(String value) {
+ FileFactory.getConfiguration().set(Constants.ENDPOINT, value);
+ return this;
+ }
+
public <T> CarbonReader<T> build() throws IOException, InterruptedException {
CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 3e5f814..5f5ee6f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -44,6 +44,8 @@ import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
+import org.apache.hadoop.fs.s3a.Constants;
+
/**
* Biulder for {@link CarbonWriter}
*/
@@ -131,6 +133,73 @@ public class CarbonWriterBuilder {
}
/**
+ * Set the access key for S3
+ *
+ * @param key the string of access key for different S3 type,like: fs.s3a.access.key
+ * @param value the value of access key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder setAccessKey(String key, String value) {
+ FileFactory.getConfiguration().set(key, value);
+ return this;
+ }
+
+ /**
+ * Set the access key for S3.
+ *
+ * @param value the value of access key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder setAccessKey(String value) {
+ return setAccessKey(Constants.ACCESS_KEY, value);
+ }
+
+ /**
+ * Set the secret key for S3
+ *
+ * @param key the string of secret key for different S3 type,like: fs.s3a.secret.key
+ * @param value the value of secret key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder setSecretKey(String key, String value) {
+ FileFactory.getConfiguration().set(key, value);
+ return this;
+ }
+
+ /**
+ * Set the secret key for S3
+ *
+ * @param value the value of secret key
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder setSecretKey(String value) {
+ return setSecretKey(Constants.SECRET_KEY, value);
+ }
+
+ /**
+ * Set the endpoint for S3
+ *
+ * @param key the string of endpoint for different S3 type,like: fs.s3a.endpoint
+ * @param value the value of endpoint
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder setEndPoint(String key, String value) {
+ FileFactory.getConfiguration().set(key, value);
+ return this;
+ }
+
+ /**
+ * Set the endpoint for S3
+ *
+ * @param value the value of endpoint
+ * @return CarbonWriterBuilder
+ */
+ public CarbonWriterBuilder setEndPoint(String value) {
+ FileFactory.getConfiguration().set(Constants.ENDPOINT, value);
+ return this;
+ }
+
+ /**
* to set the timestamp in the carbondata and carbonindex index files
* @param UUID is a timestamp to be used in the carbondata and carbonindex index files
* @return updated CarbonWriterBuilder