You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:45 UTC

[43/45] carbondata git commit: [CARBONDATA-2993] fix random NPE while concurrent loading

[CARBONDATA-2993] fix random NPE while concurrent loading

This closes #2797


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fa088256
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fa088256
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fa088256

Branch: refs/heads/branch-1.5
Commit: fa0882569872d3280807a5a57f36c4c43f48cc99
Parents: ca30ad9
Author: kunal642 <ku...@gmail.com>
Authored: Fri Oct 5 10:13:05 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Oct 5 15:31:33 2018 +0530

----------------------------------------------------------------------
 .../scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala   | 9 +++++----
 .../org/apache/carbondata/sdk/file/AvroCarbonWriter.java    | 2 +-
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa088256/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 87d8f50..3a02f85 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -47,8 +47,10 @@ abstract class CarbonRDD[T: ClassTag](
     info
   }
 
+  @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
+
   val config: Broadcast[SerializableConfiguration] = sparkContext
-    .broadcast(new SerializableConfiguration(SparkSQLUtil.sessionState(ss).newHadoopConf()))
+    .broadcast(new SerializableConfiguration(hadoopConf))
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
@@ -57,7 +59,7 @@ abstract class CarbonRDD[T: ClassTag](
   protected def internalGetPartitions: Array[Partition]
 
   override def getPartitions: Array[Partition] = {
-    ThreadLocalSessionInfo.setConfigurationToCurrentThread(config.value.value)
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(hadoopConf)
     internalGetPartitions
   }
 
@@ -66,8 +68,7 @@ abstract class CarbonRDD[T: ClassTag](
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
     TaskContext.get.addTaskCompletionListener(_ => ThreadLocalSessionInfo.unsetAll())
-    carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", config
-      .value.value)
+    carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
     val carbonTaskInfo = new CarbonTaskInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fa088256/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index d19a96d..e4a65c0 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -520,7 +520,7 @@ public class AvroCarbonWriter extends CarbonWriter {
         // recursively get the sub fields
         ArrayList<StructField> arraySubField = new ArrayList<>();
         // array will have only one sub field.
-        StructField structField = prepareSubFields("val", childSchema.getElementType());
+        StructField structField = prepareSubFields(fieldName, childSchema.getElementType());
         if (structField != null) {
           arraySubField.add(structField);
           return new Field(fieldName, "array", arraySubField);