You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/05 10:01:45 UTC
carbondata git commit: [CARBONDATA-2993] fix random NPE while
concurrent loading
Repository: carbondata
Updated Branches:
refs/heads/master ca30ad97d -> fa0882569
[CARBONDATA-2993] fix random NPE while concurrent loading
This closes #2797
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa088256
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa088256
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa088256
Branch: refs/heads/master
Commit: fa0882569872d3280807a5a57f36c4c43f48cc99
Parents: ca30ad9
Author: kunal642 <ku...@gmail.com>
Authored: Fri Oct 5 10:13:05 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Oct 5 15:31:33 2018 +0530
----------------------------------------------------------------------
.../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala | 9 +++++----
.../org/apache/carbondata/sdk/file/AvroCarbonWriter.java | 2 +-
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa088256/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 87d8f50..3a02f85 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -47,8 +47,10 @@ abstract class CarbonRDD[T: ClassTag](
info
}
+ @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
+
val config: Broadcast[SerializableConfiguration] = sparkContext
- .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf()))
+ .broadcast(new SerializableConfiguration(hadoopConf))
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
@@ -57,7 +59,7 @@ abstract class CarbonRDD[T: ClassTag](
protected def internalGetPartitions: Array[Partition]
override def getPartitions: Array[Partition] = {
- ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value)
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
internalGetPartitions
}
@@ -66,8 +68,7 @@ abstract class CarbonRDD[T: ClassTag](
final def compute(split: Partition, context: TaskContext): Iterator[T] = {
TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
- carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config
- .value.value)
+ carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
val carbonTaskInfo = new CarbonTaskInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa088256/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index d19a96d..e4a65c0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -520,7 +520,7 @@ public class AvroCarbonWriter extends CarbonWriter {
// recursively get the sub fields
ArrayList<StructField> arraySubField = new ArrayList<>();
// array will have only one sub field.
- StructField structField = prepareSubFields("val", childSchema.getElementType());
+ StructField structField = prepareSubFields(fieldName, childSchema.getElementType());
if (structField != null) {
arraySubField.add(structField);
return new Field(fieldName, "array", arraySubField);