You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/23 08:53:15 UTC

carbondata git commit: [CARBONDATA-2338][Test] Add example to upload data to S3 by using SDK

Repository: carbondata
Updated Branches:
  refs/heads/master 8b33ab240 -> 42bf13719


[CARBONDATA-2338][Test] Add example to upload data to S3 by using SDK

Add example to write carbondata files into S3 using SDK

This closes #2165


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/42bf1371
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/42bf1371
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/42bf1371

Branch: refs/heads/master
Commit: 42bf1371919a8d13cf94c26a546d681b354af893
Parents: 8b33ab2
Author: xubo245 <60...@qq.com>
Authored: Thu Apr 12 17:12:38 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Apr 23 16:52:59 2018 +0800

----------------------------------------------------------------------
 examples/spark2/pom.xml                         |   5 +
 .../carbondata/examples/S3UsingSDkExample.scala | 151 +++++++++++++++++++
 2 files changed, 156 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/42bf1371/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index e562fbf..196bc16 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -45,6 +45,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/42bf1371/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
new file mode 100644
index 0000000..7ecde88
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.examples
+
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
+import org.apache.spark.sql.SparkSession
+import org.slf4j.{Logger, LoggerFactory}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+
+/**
+ * Generate data and write data to S3
+ * User can generate different numbers of data by specifying the number-of-rows in parameters
+ */
+object S3UsingSDKExample {
+
+  // prepare SDK writer output
+  def buildTestData(
+      path: String,
+      num: Int = 3,
+      persistSchema: Boolean = false): Any = {
+
+    // getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+    val writerPath = path.replace("\\", "/");
+
+    val fields: Array[Field] = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.INT)
+    fields(2) = new Field("height", DataTypes.DOUBLE)
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(new Schema(fields)).outputPath(writerPath).isTransactionalTable(true)
+            .uniqueIdentifier(
+              System.currentTimeMillis)
+            .buildWriterForCSVInput()
+        } else {
+          builder.withSchema(new Schema(fields)).outputPath(writerPath).isTransactionalTable(true)
+            .uniqueIdentifier(
+              System.currentTimeMillis).withBlockSize(2)
+            .buildWriterForCSVInput()
+        }
+      var i = 0
+      var row = num
+      while (i < row) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case e => None
+    }
+  }
+
+  /**
+   * This example demonstrate usage of
+   * 1. create carbon table with storage location on object based storage
+   * like AWS S3, Huawei OBS, etc
+   * 2. load data into carbon table, the generated file will be stored on object based storage
+   * query the table.
+   *
+   * @param args require three parameters "Access-key" "Secret-key"
+   *             "table-path on s3" "s3-endpoint" "spark-master"
+   */
+  def main(args: Array[String]) {
+    val logger: Logger = LoggerFactory.getLogger(this.getClass)
+
+    import org.apache.spark.sql.CarbonSession._
+    if (args.length < 2 || args.length > 6) {
+      logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
+                   "[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
+      System.exit(0)
+    }
+
+    val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+    val spark = SparkSession
+      .builder()
+      .master(getSparkMaster(args))
+      .appName("S3UsingSDKExample")
+      .config("spark.driver.host", "localhost")
+      .config(accessKey, args(0))
+      .config(secretKey, args(1))
+      .config(endpoint, getS3EndPoint(args))
+      .getOrCreateCarbonSession()
+
+    spark.sparkContext.setLogLevel("WARN")
+    val path = if (args.length < 3) {
+      "s3a://sdk/WriterOutput2 "
+    } else {
+      args(2)
+    }
+    val num = if (args.length > 4) {
+      Integer.parseInt(args(4))
+    } else {
+      3
+    }
+    buildTestData(path, num)
+
+    spark.sql("DROP TABLE IF EXISTS s3_sdk_table")
+    spark.sql(s"CREATE EXTERNAL TABLE s3_sdk_table STORED BY 'carbondata'" +
+      s" LOCATION '$path/Fact/Part0/Segment_null'")
+    spark.sql("SELECT * FROM s3_sdk_table LIMIT 10").show()
+    spark.stop()
+  }
+
+  def getKeyOnPrefix(path: String): (String, String, String) = {
+    val endPoint = "spark.hadoop." + ENDPOINT
+    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
+        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
+        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+    } else {
+      throw new Exception("Incorrect Store Path")
+    }
+  }
+
+  def getS3EndPoint(args: Array[String]): String = {
+    if (args.length >= 4 && args(3).contains(".com")) args(3)
+    else ""
+  }
+
+  def getSparkMaster(args: Array[String]): String = {
+    if (args.length == 6) args(5)
+    else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
+    else "local"
+  }
+}