You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/10/31 12:53:41 UTC
carbondata git commit: [CARBONDATA-3025]handle passing spark appname
for partition table and file format
Repository: carbondata
Updated Branches:
refs/heads/master 94a4f8314 -> 625844784
[CARBONDATA-3025]handle passing spark appname for partition table and file format
Changes in this PR
1.Dataload with partion table file format fails, as the appname is not in carbonproperties in executor.
2.This PR sets the spark appname in carbon properties which will be written to carbondata footer.
3.the appname is set in hadoop conf and then set in carbonproperties in executor from getting the same from the conf
instead of hardcoding the spark property, get from exposed API appName
This closes #2861
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/62584478
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/62584478
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/62584478
Branch: refs/heads/master
Commit: 6258447849f0fb0934b6a0e728d99d62223a2854
Parents: 94a4f83
Author: akashrn5 <ak...@gmail.com>
Authored: Fri Oct 26 19:34:04 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 31 20:52:51 2018 +0800
----------------------------------------------------------------------
.../carbondata/hadoop/api/CarbonTableOutputFormat.java | 6 ++++++
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 2 +-
.../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala | 9 ++++-----
.../execution/datasources/SparkCarbonFileFormat.scala | 6 +-----
.../sql/execution/datasources/SparkCarbonTableFormat.scala | 8 ++++----
5 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f0f2858..0bcd7e1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -236,6 +236,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
public RecordWriter<NullWritable, ObjectArrayWritable> getRecordWriter(
final TaskAttemptContext taskAttemptContext) throws IOException {
final CarbonLoadModel loadModel = getLoadModel(taskAttemptContext.getConfiguration());
+ String appName =
+ taskAttemptContext.getConfiguration().get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME);
+ if (null != appName) {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName);
+ }
//if loadModel having taskNo already(like in SDK) then no need to overwrite
short sdkWriterCores = loadModel.getSdkWriterCores();
int itrSize = (sdkWriterCores > 0) ? sdkWriterCores : 1;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index d794636..338180d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -68,7 +68,7 @@ object DataLoadProcessBuilderOnSpark {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
- sparkSession.sparkContext.getConf.get("spark.app.name"))
+ sparkSession.sparkContext.appName)
val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
// 1. Input
http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index cdfce56..ce08f8f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -37,6 +37,10 @@ abstract class CarbonRDD[T: ClassTag](
@transient private val ss: SparkSession,
@transient private var deps: Seq[Dependency[_]]) extends RDD[T](ss.sparkContext, deps) {
+ @transient val sparkAppName: String = ss.sparkContext.appName
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkAppName)
+
val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
if (info == null || info.getSessionParams == null) {
@@ -57,11 +61,6 @@ abstract class CarbonRDD[T: ClassTag](
protected def internalGetPartitions: Array[Partition]
-
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
- ss.sparkContext.getConf.get("spark.app.name"))
-
override def getPartitions: Array[Partition] = {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
internalGetPartitions
http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index 9e3a4c8..cd2035c 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -121,15 +121,11 @@ class SparkCarbonFileFormat extends FileFormat
dataSchema: StructType): OutputWriterFactory = {
val conf = job.getConfiguration
-
+ conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
model.setLoadWithoutConverterStep(true)
CarbonTableOutputFormat.setLoadModel(conf, model)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
- sparkSession.sparkContext.getConf.get("spark.app.name"))
-
new OutputWriterFactory {
override def newInstance(
path: String,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/62584478/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index 47d6a71..148d317 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -121,10 +121,6 @@ with Serializable {
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
model.setLoadWithoutConverterStep(true)
- carbonProperty
- .addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
- sparkSession.sparkContext.getConf.get("spark.app.name"))
-
val staticPartition = options.getOrElse("staticpartition", null)
if (staticPartition != null) {
conf.set("carbon.staticpartition", staticPartition)
@@ -159,6 +155,7 @@ with Serializable {
if (updateTimeStamp.isDefined) {
conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
}
+ conf.set(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, sparkSession.sparkContext.appName)
CarbonTableOutputFormat.setLoadModel(conf, model)
new OutputWriterFactory {
@@ -175,6 +172,9 @@ with Serializable {
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+ val appName = context.getConfiguration.get(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME)
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, appName)
val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
val storeLocation = CommonUtil.getTempStoreLocations(taskNumber)
CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)