You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/01/09 08:42:53 UTC
carbondata git commit: [CARBONDATA-3210] Merge common method into
CarbonSparkUtil and fix example error
Repository: carbondata
Updated Branches:
refs/heads/master 77d2b4e8d -> 3a41ee5df
[CARBONDATA-3210] Merge common method into CarbonSparkUtil and fix example error
1.merge public methods to spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
org.apache.carbondata.examples.S3UsingSDKExample#getKeyOnPrefix
org.apache.carbondata.examples.S3Example$#getKeyOnPrefix
org.apache.carbondata.spark.thriftserver.CarbonThriftServer#getKeyOnPrefix
2. fix the error of S3UsingSDKExample
This closes #3032
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3a41ee5d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3a41ee5d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3a41ee5d
Branch: refs/heads/master
Commit: 3a41ee5dfd513581ad832ec05ca330759f98bc0e
Parents: 77d2b4e
Author: xiaohui0318 <24...@qq.com>
Authored: Fri Dec 28 14:08:54 2018 +0800
Committer: xubo245 <xu...@huawei.com>
Committed: Wed Jan 9 16:42:39 2019 +0800
----------------------------------------------------------------------
.../apache/carbondata/examples/S3Example.scala | 34 +++----------
.../carbondata/examples/S3UsingSDkExample.scala | 51 +++++++-------------
.../spark/thriftserver/CarbonThriftServer.scala | 37 ++++----------
.../carbondata/spark/util/CarbonSparkUtil.scala | 40 +++++++++++++--
4 files changed, 68 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a41ee5d/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
index 9cc43d0..6c5bfc6 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3Example.scala
@@ -18,11 +18,10 @@ package org.apache.carbondata.examples
import java.io.File
-import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.{Row, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.util.CarbonSparkUtil
object S3Example {
@@ -38,18 +37,18 @@ object S3Example {
*/
def main(args: Array[String]) {
val rootPath = new File(this.getClass.getResource("/").getPath
- + "../../../..").getCanonicalPath
+ + "../../../..").getCanonicalPath
val path = s"$rootPath/examples/spark2/src/main/resources/data1.csv"
val logger: Logger = LoggerFactory.getLogger(this.getClass)
import org.apache.spark.sql.CarbonSession._
if (args.length < 3 || args.length > 5) {
logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
- "<table-path-on-s3> [s3-endpoint] [spark-master]")
+ "<table-path-on-s3> [s3-endpoint] [spark-master]")
System.exit(0)
}
- val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+ val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(2))
val spark = SparkSession
.builder()
.master(getSparkMaster(args))
@@ -57,13 +56,12 @@ object S3Example {
.config("spark.driver.host", "localhost")
.config(accessKey, args(0))
.config(secretKey, args(1))
- .config(endpoint, getS3EndPoint(args))
+ .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
.getOrCreateCarbonSession()
spark.sparkContext.setLogLevel("WARN")
spark.sql("Drop table if exists carbon_table")
-
spark.sql(
s"""
| CREATE TABLE if not exists carbon_table(
@@ -79,7 +77,7 @@ object S3Example {
| floatField FLOAT
| )
| STORED BY 'carbondata'
- | LOCATION '${ args(2) }'
+ | LOCATION '${args(2)}'
| TBLPROPERTIES('SORT_COLUMNS'='', 'DICTIONARY_INCLUDE'='dateField, charField')
""".stripMargin)
@@ -136,26 +134,6 @@ object S3Example {
spark.stop()
}
- def getKeyOnPrefix(path: String): (String, String, String) = {
- val endPoint = "spark.hadoop." + ENDPOINT
- if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
- ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
- } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
- ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
- "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
- } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
- ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
- "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
- } else {
- throw new Exception("Incorrect Store Path")
- }
- }
-
- def getS3EndPoint(args: Array[String]): String = {
- if (args.length >= 4 && args(3).contains(".com")) args(3)
- else ""
- }
-
def getSparkMaster(args: Array[String]): String = {
if (args.length == 5) args(4)
else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a41ee5d/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
index e0c2cdc..c335daf 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/S3UsingSDkExample.scala
@@ -16,28 +16,29 @@
*/
package org.apache.carbondata.examples
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.SparkSession
import org.slf4j.{Logger, LoggerFactory}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
+import org.apache.carbondata.spark.util.CarbonSparkUtil
+
/**
* Generate data and write data to S3
* User can generate different numbers of data by specifying the number-of-rows in parameters
*/
-object S3UsingSDKExample {
+object S3UsingSdkExample {
// prepare SDK writer output
def buildTestData(
- path: String,
- num: Int = 3): Any = {
+ args: Array[String],
+ path: String,
+ num: Int = 3): Any = {
// getCanonicalPath gives path with \, but the code expects /.
- val writerPath = path.replace("\\", "/");
+ val writerPath = path.replace("\\", "/")
val fields: Array[Field] = new Array[Field](3)
fields(0) = new Field("name", DataTypes.STRING)
@@ -50,16 +51,20 @@ object S3UsingSDKExample {
builder.outputPath(writerPath)
.uniqueIdentifier(System.currentTimeMillis)
.withBlockSize(2)
+ .writtenBy("S3UsingSdkExample")
+ .withHadoopConf(ACCESS_KEY, args(0))
+ .withHadoopConf(SECRET_KEY, args(1))
+ .withHadoopConf(ENDPOINT, CarbonSparkUtil.getS3EndPoint(args))
.withCsvInput(new Schema(fields)).build()
var i = 0
- var row = num
+ val row = num
while (i < row) {
writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
i += 1
}
writer.close()
} catch {
- case ex: Throwable => None
+ case e: Exception => throw e
}
}
@@ -79,11 +84,11 @@ object S3UsingSDKExample {
import org.apache.spark.sql.CarbonSession._
if (args.length < 2 || args.length > 6) {
logger.error("Usage: java CarbonS3Example <access-key> <secret-key>" +
- "[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
+ "[table-path-on-s3] [s3-endpoint] [number-of-rows] [spark-master]")
System.exit(0)
}
- val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(2))
+ val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(2))
val spark = SparkSession
.builder()
.master(getSparkMaster(args))
@@ -91,7 +96,7 @@ object S3UsingSDKExample {
.config("spark.driver.host", "localhost")
.config(accessKey, args(0))
.config(secretKey, args(1))
- .config(endpoint, getS3EndPoint(args))
+ .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
.getOrCreateCarbonSession()
spark.sparkContext.setLogLevel("WARN")
@@ -105,7 +110,7 @@ object S3UsingSDKExample {
} else {
3
}
- buildTestData(path, num)
+ buildTestData(args, path, num)
spark.sql("DROP TABLE IF EXISTS s3_sdk_table")
spark.sql(s"CREATE EXTERNAL TABLE s3_sdk_table STORED BY 'carbondata'" +
@@ -114,29 +119,9 @@ object S3UsingSDKExample {
spark.stop()
}
- def getKeyOnPrefix(path: String): (String, String, String) = {
- val endPoint = "spark.hadoop." + ENDPOINT
- if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
- ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
- } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
- ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
- "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
- } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
- ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
- "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
- } else {
- throw new Exception("Incorrect Store Path")
- }
- }
-
- def getS3EndPoint(args: Array[String]): String = {
- if (args.length >= 4 && args(3).contains(".com")) args(3)
- else ""
- }
-
def getSparkMaster(args: Array[String]): String = {
if (args.length == 6) args(5)
- else if (args(3).contains("spark:") || args(3).contains("mesos:")) args(3)
else "local"
}
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a41ee5d/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
index ce46af3..e268e5d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/thriftserver/CarbonThriftServer.scala
@@ -28,12 +28,13 @@ import org.slf4j.{Logger, LoggerFactory}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.util.CarbonSparkUtil
-/**
- * CarbonThriftServer support different modes:
- * 1. read/write data from/to HDFS or local,it only needs configurate storePath
- * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint
- */
+ /**
+ * CarbonThriftServer support different modes:
+ * 1. read/write data from/to HDFS or local,it only needs configurate storePath
+ * 2. read/write data from/to S3, it needs provide access-key, secret-key, s3-endpoint
+ */
object CarbonThriftServer {
def main(args: Array[String]): Unit = {
@@ -72,10 +73,10 @@ object CarbonThriftServer {
val spark = if (args.length <= 1) {
builder.getOrCreateCarbonSession(storePath)
} else {
- val (accessKey, secretKey, endpoint) = getKeyOnPrefix(args(0))
+ val (accessKey, secretKey, endpoint) = CarbonSparkUtil.getKeyOnPrefix(args(0))
builder.config(accessKey, args(1))
.config(secretKey, args(2))
- .config(endpoint, getS3EndPoint(args))
+ .config(endpoint, CarbonSparkUtil.getS3EndPoint(args))
.getOrCreateCarbonSession(storePath)
}
@@ -86,31 +87,11 @@ object CarbonThriftServer {
case e: Exception =>
val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOG.error(s"Wrong value for carbon.spark.warmUpTime $warmUpTime " +
- "Using default Value and proceeding")
+ "Using default Value and proceeding")
Thread.sleep(5000)
}
HiveThriftServer2.startWithContext(spark.sqlContext)
}
- def getKeyOnPrefix(path: String): (String, String, String) = {
- val endPoint = "spark.hadoop." + ENDPOINT
- if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
- ("spark.hadoop." + ACCESS_KEY, "spark.hadoop." + SECRET_KEY, endPoint)
- } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
- ("spark.hadoop." + CarbonCommonConstants.S3N_ACCESS_KEY,
- "spark.hadoop." + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
- } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
- ("spark.hadoop." + CarbonCommonConstants.S3_ACCESS_KEY,
- "spark.hadoop." + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
- } else {
- throw new Exception("Incorrect Store Path")
- }
- }
-
- def getS3EndPoint(args: Array[String]): String = {
- if (args.length >= 4 && args(3).contains(".com")) args(3)
- else ""
- }
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/3a41ee5d/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
index b2687d0..8feb1b9 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/util/CarbonSparkUtil.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.hadoop.fs.s3a.Constants.{ACCESS_KEY, ENDPOINT, SECRET_KEY}
import org.apache.spark.sql.hive.{CarbonMetaData, CarbonRelation, DictionaryMap}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -30,6 +31,9 @@ import org.apache.carbondata.core.util.CarbonUtil
case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
+/**
+ * carbon spark common methods
+ */
object CarbonSparkUtil {
def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
@@ -41,7 +45,7 @@ object CarbonSparkUtil {
carbonTable.getDimensionByTableName(carbonTable.getTableName).asScala.map { f =>
(f.getColName.toLowerCase,
f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !f.getDataType.isComplexType)
+ !f.getDataType.isComplexType)
}
CarbonMetaData(dimensionsAttr,
measureAttr,
@@ -62,8 +66,8 @@ object CarbonSparkUtil {
/**
* return's the formatted column comment if column comment is present else empty("")
*
- * @param carbonColumn
- * @return
+ * @param carbonColumn the column of carbonTable
+ * @return string
*/
def getColumnComment(carbonColumn: CarbonColumn): String = {
{
@@ -81,8 +85,8 @@ object CarbonSparkUtil {
/**
* the method return's raw schema
*
- * @param carbonRelation
- * @return
+ * @param carbonRelation logical plan for one carbon table
+ * @return schema
*/
def getRawSchema(carbonRelation: CarbonRelation): String = {
val fields = new Array[String](
@@ -110,6 +114,10 @@ object CarbonSparkUtil {
/**
* add escape prefix for delimiter
+ *
+ * @param delimiter A delimiter is a sequence of one or more characters
+ * used to specify the boundary between separate
+ * @return delimiter
*/
def delimiterConverter4Udf(delimiter: String): String = delimiter match {
case "|" | "*" | "." | ":" | "^" | "\\" | "$" | "+" | "?" | "(" | ")" | "{" | "}" | "[" | "]" =>
@@ -117,4 +125,26 @@ object CarbonSparkUtil {
case _ =>
delimiter
}
+
+ def getKeyOnPrefix(path: String): (String, String, String) = {
+ val prefix = "spark.hadoop."
+ val endPoint = prefix + ENDPOINT
+ if (path.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+ (prefix + ACCESS_KEY, prefix + SECRET_KEY, endPoint)
+ } else if (path.startsWith(CarbonCommonConstants.S3N_PREFIX)) {
+ (prefix + CarbonCommonConstants.S3N_ACCESS_KEY,
+ prefix + CarbonCommonConstants.S3N_SECRET_KEY, endPoint)
+ } else if (path.startsWith(CarbonCommonConstants.S3_PREFIX)) {
+ (prefix + CarbonCommonConstants.S3_ACCESS_KEY,
+ prefix + CarbonCommonConstants.S3_SECRET_KEY, endPoint)
+ } else {
+ throw new Exception("Incorrect Store Path")
+ }
+ }
+
+ def getS3EndPoint(args: Array[String]): String = {
+ if (args.length >= 4 && args(3).contains(".com")) args(3)
+ else ""
+ }
+
}