You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/26 13:59:40 UTC

carbondata git commit: [CARBONDATA-2260] CarbonThriftServer should support store carbon table on S3

Repository: carbondata
Updated Branches:
  refs/heads/master 0668e7d71 -> 3262230cb


[CARBONDATA-2260] CarbonThriftServer should support store carbon table on S3

CarbonThriftServer should support store carbon table on S3
Add config for AK,SK,EndPoint when start carbonsession

This closes #2073


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3262230c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3262230c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3262230c

Branch: refs/heads/master
Commit: 3262230cb3e099c20b2a14d9d55cad4b9fe91e2e
Parents: 0668e7d
Author: root <60...@qq.com>
Authored: Sat Mar 17 17:45:34 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Apr 26 21:59:21 2018 +0800

----------------------------------------------------------------------
 .../spark/thriftserver/CarbonThriftServer.scala | 45 +++++++++++++++++++-
 1 file changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3262230c/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index 34ac940..ce46af3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -19,14 +19,21 @@ package org.apache.carbondata.spark.thriftserver
 
 import java.io.File
 
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
+import org.slf4j.{Logger, LoggerFactory}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
+/**
+ * CarbonThriftServer support different modes:
+ * 1. read/write data from/to HDFS or local,it only needs configurate storePath
+ * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint
+ */
 object CarbonThriftServer {
 
   def main(args: Array[String]): Unit = {
@@ -34,6 +41,13 @@ object CarbonThriftServer {
     import org.apache.spark.sql.CarbonSession._
 
     val sparkConf = new SparkConf(loadDefaults = true)
+
+    val logger: Logger = LoggerFactory.getLogger(this.getClass)
+    if (args.length != 0 && args.length != 1 && args.length != 4) {
+      logger.error("parameters: storePath [access-key] [secret-key] [s3-endpoint]")
+      System.exit(0)
+    }
+
     val builder = SparkSession
       .builder()
       .config(sparkConf)
@@ -55,7 +69,16 @@ object CarbonThriftServer {
 
     val storePath = if (args.length > 0) args.head else null
 
-    val spark = builder.getOrCreateCarbonSession(storePath)
+    val spark = if (args.length <= 1) {
+      builder.getOrCreateCarbonSession(storePath)
+    } else {
+      val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(0))
+      builder.config(accessKey, args(1))
+        .config(secretKey, args(2))
+        .config(endpoint, getS3EndPoint(args))
+        .getOrCreateCarbonSession(storePath)
+    }
+
     val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
     try {
       Thread.sleep(Integer.parseInt(warmUpTime))
@@ -70,4 +93,24 @@ object CarbonThriftServer {
     HiveThriftServer2.startWithContext(spark.sqlContext)
   }
 
+  def getKeyOnPrefix(path: String): (String, String, String) = {
+    val endPoint = "spark.hadoop." + ENDPOINT
+    if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+      ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+      ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
+        "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+    } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+      ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
+        "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+    } else {
+      throw new Exception("Incorrect Store Path")
+    }
+  }
+
+  def getS3EndPoint(args: Array[String]): String = {
+    if (args.length >= 4 && args(3).contains(".com")) args(3)
+    else ""
+  }
+
 }