You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/11/20 04:59:20 UTC
carbondata git commit: [CARBONDATA-3106] WrittenbyAPI not serialized
in executor with globalsort
Repository: carbondata
Updated Branches:
refs/heads/master d4e8ba441 -> da91d4cc6
[CARBONDATA-3106] WrittenbyAPI not serialized in executor with globalsort
Problem:
Written_By_APPNAME when added in carbonproperty is not serialized in executor with global sort
Solution:
Add Written_by_APPNAME in hadoop conf and in executor side get it from configuration and add to carbonproperty
This closes #2928
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da91d4cc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da91d4cc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da91d4cc
Branch: refs/heads/master
Commit: da91d4cc6805ce63aade48562a4f367442b38d4a
Parents: d4e8ba4
Author: Indhumathi27 <in...@gmail.com>
Authored: Fri Nov 16 21:49:16 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Nov 20 10:34:28 2018 +0530
----------------------------------------------------------------------
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 5 ++---
.../spark/load/DataLoadProcessorStepOnSpark.scala | 6 +++++-
.../store/writer/v3/CarbonFactDataWriterImplV3.java | 10 +++++++---
3 files changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da91d4cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 338180d..8ded6bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -66,9 +66,8 @@ object DataLoadProcessBuilderOnSpark {
val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
- sparkSession.sparkContext.appName)
+ hadoopConf
+ .set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
// 1. Input
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da91d4cc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0a68fb0..2ca47b3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -26,9 +26,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException
import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
import org.apache.carbondata.processing.loading.{BadRecordsLogger, BadRecordsLoggerProvider, CarbonDataLoadConfiguration, DataLoadProcessBuilder, TableProcessingOperations}
import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
@@ -228,6 +229,9 @@ object DataLoadProcessorStepOnSpark {
modelBroadcast: Broadcast[CarbonLoadModel],
rowCounter: Accumulator[Int],
conf: Configuration) {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
+ conf.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME))
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
var model: CarbonLoadModel = null
var tableName: String = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da91d4cc/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index f168796..ccbc544 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -104,9 +104,13 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter {
.convertFileFooterVersion3(blockletMetadata, blockletIndex, localCardinality,
thriftColumnSchemaList.size());
convertFileMeta.setIs_sort(isSorted);
- convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO,
- CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME));
+ String appName = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
+ if (appName == null) {
+ throw new CarbonDataWriterException(
+ "DataLoading failed as CARBON_WRITTEN_BY_APPNAME is null");
+ }
+ convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO, appName);
convertFileMeta.putToExtra_info(CarbonCommonConstants.CARBON_WRITTEN_VERSION,
CarbonVersionConstants.CARBONDATA_VERSION);
// fill the carbon index details