You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/27 10:16:39 UTC

carbondata git commit: [CARBONDATA-2384] SDK support write/read data into/from S3

Repository: carbondata
Updated Branches:
  refs/heads/master fae457a35 -> 242c08be4


[CARBONDATA-2384] SDK support write/read data into/from S3

User can set his credential in SDK and use SDK to write data into S3 and read data from S3.

This closes #2226


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/242c08be
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/242c08be
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/242c08be

Branch: refs/heads/master
Commit: 242c08be452ada59397ceaa906a568b35825b5f3
Parents: fae457a
Author: xubo245 <60...@qq.com>
Authored: Wed Apr 25 15:09:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Apr 27 18:16:22 2018 +0800

----------------------------------------------------------------------
 .../carbondata/examples/sdk/SDKS3Example.java   | 103 +++++++++++++++++++
 store/sdk/pom.xml                               |   5 +
 .../sdk/file/CarbonReaderBuilder.java           |  69 +++++++++++++
 .../sdk/file/CarbonWriterBuilder.java           |  69 +++++++++++++
 4 files changed, 246 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
new file mode 100644
index 0000000..60aa1f8
--- /dev/null
+++ b/examples/spark2/src/main/java/org/apache/carbondata/examples/sdk/SDKS3Example.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples.sdk;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.sdk.file.*;
+
+/**
+ * Example for testing CarbonWriter on S3
+ */
+public class SDKS3Example {
+    public static void main(String[] args) throws Exception {
+        LogService logger = LogServiceFactory.getLogService(SDKS3Example.class.getName());
+        if (args == null || args.length < 3) {
+            logger.error("Usage: java CarbonS3Example: <access-key> <secret-key>" +
+                    "<s3-endpoint> [table-path-on-s3] [persistSchema] [transactionalTable]");
+            System.exit(0);
+        }
+
+        String path = "s3a://sdk/WriterOutput";
+        if (args.length > 3) {
+            path=args[3];
+        }
+
+        int num = 3;
+        if (args.length > 4) {
+            num = Integer.parseInt(args[4]);
+        }
+
+        Boolean persistSchema = true;
+        if (args.length > 5) {
+            if (args[5].equalsIgnoreCase("true")) {
+                persistSchema = true;
+            } else {
+                persistSchema = false;
+            }
+        }
+
+        Boolean transactionalTable = true;
+        if (args.length > 6) {
+            if (args[6].equalsIgnoreCase("true")) {
+                transactionalTable = true;
+            } else {
+                transactionalTable = false;
+            }
+        }
+
+        Field[] fields = new Field[2];
+        fields[0] = new Field("name", DataTypes.STRING);
+        fields[1] = new Field("age", DataTypes.INT);
+        CarbonWriterBuilder builder = CarbonWriter.builder()
+                .withSchema(new Schema(fields))
+                .setAccessKey(args[0])
+                .setSecretKey(args[1])
+                .setEndPoint(args[2])
+                .outputPath(path)
+                .persistSchemaFile(persistSchema)
+                .isTransactionalTable(transactionalTable);
+
+        CarbonWriter writer = builder.buildWriterForCSVInput();
+
+        for (int i = 0; i < num; i++) {
+            writer.write(new String[]{"robot" + (i % 10), String.valueOf(i)});
+        }
+        writer.close();
+        // Read data
+        CarbonReader reader = CarbonReader
+                .builder(path, "_temp")
+                .projection(new String[]{"name", "age"})
+                .setAccessKey(args[0])
+                .setSecretKey(args[1])
+                .setEndPoint(args[2])
+                .build();
+
+        System.out.println("\nData:");
+        int i = 0;
+        while (i < 20 && reader.hasNext()) {
+            Object[] row = (Object[]) reader.readNextRow();
+            System.out.println(row[0] + " " + row[1]);
+            i++;
+        }
+        System.out.println("\nFinished");
+        // TODO
+        //        reader.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index a7869e4..af0d079 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -40,6 +40,11 @@
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
index 7f00b49..9560ef7 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java
@@ -24,12 +24,14 @@ import java.util.Objects;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.hadoop.CarbonProjection;
 import org.apache.carbondata.hadoop.api.CarbonFileInputFormat;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
@@ -64,6 +66,73 @@ public class CarbonReaderBuilder {
     return this;
   }
 
+  /**
+   * Set the access key for S3
+   *
+   * @param key   the string of access key for different S3 type,like: fs.s3a.access.key
+   * @param value the value of access key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonReaderBuilder setAccessKey(String key, String value) {
+    FileFactory.getConfiguration().set(key, value);
+    return this;
+  }
+
+  /**
+   * Set the access key for S3.
+   *
+   * @param value the value of access key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonReaderBuilder setAccessKey(String value) {
+    return setAccessKey(Constants.ACCESS_KEY, value);
+  }
+
+  /**
+   * Set the secret key for S3
+   *
+   * @param key   the string of secret key for different S3 type,like: fs.s3a.secret.key
+   * @param value the value of secret key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonReaderBuilder setSecretKey(String key, String value) {
+    FileFactory.getConfiguration().set(key, value);
+    return this;
+  }
+
+  /**
+   * Set the secret key for S3
+   *
+   * @param value the value of secret key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonReaderBuilder setSecretKey(String value) {
+    return setSecretKey(Constants.SECRET_KEY, value);
+  }
+
+  /**
+   * Set the endpoint for S3
+   *
+   * @param key   the string of endpoint for different S3 type,like: fs.s3a.endpoint
+   * @param value the value of endpoint
+   * @return CarbonWriterBuilder
+   */
+  public CarbonReaderBuilder setEndPoint(String key, String value) {
+    FileFactory.getConfiguration().set(key, value);
+    return this;
+  }
+
+  /**
+   * Set the endpoint for S3
+   *
+   * @param value the value of endpoint
+   * @return CarbonWriterBuilder
+   */
+  public CarbonReaderBuilder setEndPoint(String value) {
+    FileFactory.getConfiguration().set(Constants.ENDPOINT, value);
+    return this;
+  }
+
   public <T> CarbonReader<T> build() throws IOException, InterruptedException {
     CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/242c08be/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 3e5f814..5f5ee6f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -44,6 +44,8 @@ import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
 
+import org.apache.hadoop.fs.s3a.Constants;
+
 /**
  * Biulder for {@link CarbonWriter}
  */
@@ -131,6 +133,73 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * Set the access key for S3
+   *
+   * @param key   the string of access key for different S3 type,like: fs.s3a.access.key
+   * @param value the value of access key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder setAccessKey(String key, String value) {
+    FileFactory.getConfiguration().set(key, value);
+    return this;
+  }
+
+  /**
+   * Set the access key for S3.
+   *
+   * @param value the value of access key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder setAccessKey(String value) {
+    return setAccessKey(Constants.ACCESS_KEY, value);
+  }
+
+  /**
+   * Set the secret key for S3
+   *
+   * @param key   the string of secret key for different S3 type,like: fs.s3a.secret.key
+   * @param value the value of secret key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder setSecretKey(String key, String value) {
+    FileFactory.getConfiguration().set(key, value);
+    return this;
+  }
+
+  /**
+   * Set the secret key for S3
+   *
+   * @param value the value of secret key
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder setSecretKey(String value) {
+    return setSecretKey(Constants.SECRET_KEY, value);
+  }
+
+  /**
+   * Set the endpoint for S3
+   *
+   * @param key   the string of endpoint for different S3 type,like: fs.s3a.endpoint
+   * @param value the value of endpoint
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder setEndPoint(String key, String value) {
+    FileFactory.getConfiguration().set(key, value);
+    return this;
+  }
+
+  /**
+   * Set the endpoint for S3
+   *
+   * @param value the value of endpoint
+   * @return CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder setEndPoint(String value) {
+    FileFactory.getConfiguration().set(Constants.ENDPOINT, value);
+    return this;
+  }
+
+  /**
    * to set the timestamp in the carbondata and carbonindex index files
    * @param UUID is a timestamp to be used in the carbondata and carbonindex index files
    * @return updated CarbonWriterBuilder