You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/11/20 04:59:20 UTC

carbondata git commit: [CARBONDATA-3106] WrittenbyAPI not serialized in executor with globalsort

Repository: carbondata
Updated Branches:
  refs/heads/master d4e8ba441 -> da91d4cc6


[CARBONDATA-3106] WrittenbyAPI not serialized in executor with globalsort

Problem:
Written_By_APPNAME when added in carbonproperty is not serialized in executor with global sort

Solution:
Add Written_by_APPNAME in hadoop conf and in executor side get it from configuration and add to carbonproperty

This closes #2928


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da91d4cc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da91d4cc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da91d4cc

Branch: refs/heads/master
Commit: da91d4cc6805ce63aade48562a4f367442b38d4a
Parents: d4e8ba4
Author: Indhumathi27 <in...@gmail.com>
Authored: Fri Nov 16 21:49:16 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Nov 20 10:34:28 2018 +0530

----------------------------------------------------------------------
 .../spark/load/DataLoadProcessBuilderOnSpark.scala        |  5 ++---
 .../spark/load/DataLoadProcessorStepOnSpark.scala         |  6 +++++-
 .../store/writer/v3/CarbonFactDataWriterImplV3.java       | 10 +++++++---
 3 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/da91d4cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 338180d..8ded6bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -66,9 +66,8 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
-        sparkSession.sparkContext.appName)
+    hadoopConf
+      .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
 
     val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da91d4cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0a68fb0..2ca47b3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -228,6 +229,9 @@ object DataLoadProcessorStepOnSpark {
       modelBroadcast: Broadcast[CarbonLoadModel],
       rowCounter: Accumulator[Int],
       conf: Configuration) {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+        conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME))
     ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
     var model: CarbonLoadModel = null
     var tableName: String = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da91d4cc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index f168796..ccbc544 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -104,9 +104,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
           .convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
               thriftColumnSchemaList.size());
       convertFileMeta.setIs_sort(isSorted);
-      convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
-          CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME));
+      String appName = CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
+      if (appName == null) {
+        throw new CarbonDataWriterException(
+            "DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null");
+      }
+      convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, appName);
       convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION,
           CarbonVersionConstants.CARBONDATA_VERSION);
       // fill the carbon index details