You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/10/31 12:53:41 UTC

carbondata git commit: [CARBONDATA-3025]handle passing spark appname for partition table and file format

Repository: carbondata
Updated Branches:
  refs/heads/master 94a4f8314 -> 625844784


[CARBONDATA-3025]handle passing spark appname for partition table and file format

Changes in this PR

1.Dataload with partion table file format fails, as the appname is not in carbonproperties in executor.
2.This PR sets the spark appname in carbon properties which will be written to carbondata footer.
3.the appname is set in hadoop conf and then set in carbonproperties in executor from getting the same from the conf
instead of hardcoding the spark property, get from exposed API appName

This closes #2861


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/62584478
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/62584478
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/62584478

Branch: refs/heads/master
Commit: 6258447849f0fb0934b6a0e728d99d62223a2854
Parents: 94a4f83
Author: akashrn5 <ak...@gmail.com>
Authored: Fri Oct 26 19:34:04 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 31 20:52:51 2018 +0800

----------------------------------------------------------------------
 .../carbondata/hadoop/api/CarbonTableOutputFormat.java      | 6 ++++++
 .../spark/load/DataLoadProcessBuilderOnSpark.scala          | 2 +-
 .../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala   | 9 ++++-----
 .../execution/datasources/SparkCarbonFileFormat.scala       | 6 +-----
 .../sql/execution/datasources/SparkCarbonTableFormat.scala  | 8 ++++----
 5 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f0f2858..0bcd7e1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -236,6 +236,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
   public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
       final TaskAttemptContext taskAttemptContext) throws IOException {
     final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
+    String appName =
+        taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
+    if (null != appName) {
+      CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName);
+    }
     //if loadModel having taskNo already(like in SDK) then no need to overwrite
     short sdkWriterCores = loadModel.getSdkWriterCores();
     int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index d794636..338180d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -68,7 +68,7 @@ object DataLoadProcessBuilderOnSpark {
 
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.getConf.get("spark.app.name"))
+        sparkSession.sparkContext.appName)
 
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index cdfce56..ce08f8f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -37,6 +37,10 @@ abstract class CarbonRDD[T: ClassTag](
     @transient private val ss: SparkSession,
     @transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
 
+  @transient val sparkAppName: String = ss.sparkContext.appName
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkAppName)
+
   val carbonSessionInfo: CarbonSessionInfo = {
     var info = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (info == null || info.getSessionParams == null) {
@@ -57,11 +61,6 @@ abstract class CarbonRDD[T: ClassTag](
 
   protected def internalGetPartitions: Array[Partition]
 
-
-  CarbonProperties.getInstance()
-    .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-      ss.sparkContext.getConf.get("spark.app.name"))
-
   override def getPartitions: Array[Partition] = {
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
     internalGetPartitions

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 9e3a4c8..cd2035c 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -121,15 +121,11 @@ class SparkCarbonFileFormat extends FileFormat
       dataSchema: StructType): OutputWriterFactory = {
 
     val conf = job.getConfiguration
-
+    conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
     val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
     model.setLoadWithoutConverterStep(true)
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.getConf.get("spark.app.name"))
-
     new OutputWriterFactory {
       override def newInstance(
           path: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 47d6a71..148d317 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -121,10 +121,6 @@ with Serializable {
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
     model.setLoadWithoutConverterStep(true)
-    carbonProperty
-      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.getConf.get("spark.app.name"))
-
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {
       conf.set("carbon.staticpartition", staticPartition)
@@ -159,6 +155,7 @@ with Serializable {
     if (updateTimeStamp.isDefined) {
       conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
     }
+    conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {
@@ -175,6 +172,9 @@ with Serializable {
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
         val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+        val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
+        CarbonProperties.getInstance().addProperty(
+          CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName)
         val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
         val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
         CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)