You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/04/26 13:59:40 UTC
carbondata git commit: [CARBONDATA-2260] CarbonThriftServer should
support store carbon table on S3
Repository: carbondata
Updated Branches:
refs/heads/master 0668e7d71 -> 3262230cb
[CARBONDATA-2260] CarbonThriftServer should support store carbon table on S3
CarbonThriftServer should support store carbon table on S3
Add config for AK,SK,EndPoint when start carbonsession
This closes #2073
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3262230c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3262230c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3262230c
Branch: refs/heads/master
Commit: 3262230cb3e099c20b2a14d9d55cad4b9fe91e2e
Parents: 0668e7d
Author: root <60...@qq.com>
Authored: Sat Mar 17 17:45:34 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Apr 26 21:59:21 2018 +0800
----------------------------------------------------------------------
.../spark/thriftserver/CarbonThriftServer.scala | 45 +++++++++++++++++++-
1 file changed, 44 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3262230c/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index 34ac940..ce46af3 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -19,14 +19,21 @@ package org.apache.carbondata.spark.thriftserver
import java.io.File
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
+import org.slf4j.{Logger, LoggerFactory}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
+/**
+ * CarbonThriftServer support different modes:
+ * 1. read/write data from/to HDFS or local,it only needs configurate storePath
+ * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint
+ */
object CarbonThriftServer {
def main(args: Array[String]): Unit = {
@@ -34,6 +41,13 @@ object CarbonThriftServer {
import org.apache.spark.sql.CarbonSession._
val sparkConf = new SparkConf(loadDefaults = true)
+
+ val logger: Logger = LoggerFactory.getLogger(this.getClass)
+ if (args.length != 0 && args.length != 1 && args.length != 4) {
+ logger.error("parameters: storePath [access-key] [secret-key] [s3-endpoint]")
+ System.exit(0)
+ }
+
val builder = SparkSession
.builder()
.config(sparkConf)
@@ -55,7 +69,16 @@ object CarbonThriftServer {
val storePath = if (args.length > 0) args.head else null
- val spark = builder.getOrCreateCarbonSession(storePath)
+ val spark = if (args.length <= 1) {
+ builder.getOrCreateCarbonSession(storePath)
+ } else {
+ val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(0))
+ builder.config(accessKey, args(1))
+ .config(secretKey, args(2))
+ .config(endpoint, getS3EndPoint(args))
+ .getOrCreateCarbonSession(storePath)
+ }
+
val warmUpTime = CarbonProperties.getInstance().getProperty("carbon.spark.warmUpTime", "5000")
try {
Thread.sleep(Integer.parseInt(warmUpTime))
@@ -70,4 +93,24 @@ object CarbonThriftServer {
HiveThriftServer2.startWithContext(spark.sqlContext)
}
+ def getKeyOnPrefix(path: String): (String, String, String) = {
+ val endPoint = "spark.hadoop." + ENDPOINT
+ if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+ ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
+ } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+ ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
+ "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+ } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+ ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
+ "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+ } else {
+ throw new Exception("Incorrect Store Path")
+ }
+ }
+
+ def getS3EndPoint(args: Array[String]): String = {
+ if (args.length >= 4 && args(3).contains(".com")) args(3)
+ else ""
+ }
+
}