You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/03 12:17:28 UTC

carbondata git commit: [CARBONDATA-2080] [S3-Implementation] Propagated hadoopConf from driver to executor for s3 implementation in cluster mode.

Repository: carbondata
Updated Branches:
  refs/heads/carbonstore c3e99681b -> e502c59a2


[CARBONDATA-2080] [S3-Implementation] Propagated hadoopConf from driver to executor for s3 implementation in cluster mode.

Problem : hadoopconf was not getting propagated from driver to the executor that's why load was failing to the distributed environment.
Solution: Setting the Hadoop conf in base class CarbonRDD
How to verify this PR :
Execute the load in the cluster mode It should be a success using location s3.

This closes #1860


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e502c59a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e502c59a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e502c59a

Branch: refs/heads/carbonstore
Commit: e502c59a2d0b95d80db3aff04c749654254eadbe
Parents: c3e9968
Author: Jatin <ja...@knoldus.in>
Authored: Thu Jan 25 16:53:00 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Feb 3 20:17:10 2018 +0800

----------------------------------------------------------------------
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  2 +-
 .../spark/rdd/AlterTableDropColumnRDD.scala     |  2 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  2 +-
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  2 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  2 +-
 .../spark/rdd/CarbonDropPartitionRDD.scala      |  4 +--
 .../spark/rdd/CarbonDropTableRDD.scala          |  2 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  3 +-
 .../spark/rdd/CarbonMergeFilesRDD.scala         |  2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  2 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala | 32 ++++++++++++++++++--
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  2 +-
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |  2 +-
 .../apache/spark/rdd/DataLoadCoalescedRDD.scala |  3 +-
 .../apache/spark/rdd/UpdateCoalescedRDD.scala   |  2 +-
 .../carbondata/streaming/StreamHandoffRDD.scala |  2 +-
 16 files changed, 47 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 56a66b9..7c1edea 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -50,7 +50,7 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
 class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     identifier: AbsoluteTableIdentifier)
-  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
+  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
 
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 248f351..e14524e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -48,7 +48,7 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
 class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: AbsoluteTableIdentifier)
-  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
+  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 32523d8..9936a2a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -33,7 +33,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
     databaseName: String,
     tableName: String,
     partitioner: Partitioner)
-  extends CarbonRDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 45271a7..b11dfad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -39,7 +39,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
     dimTableName: String,
     storePath: String,
     loadMetadataDetails: List[LoadMetadataDetails])
-  extends CarbonRDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 9a1ef33..759ed42 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -34,7 +34,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
     databaseName: String,
     tableName: String,
     partitioner: Partitioner)
-  extends CarbonRDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
   override def getPartitions: Array[Partition] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 4806f9f..800cc36 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -47,7 +47,7 @@ class CarbonDropPartitionRDD(
     partitions: Seq[String],
     uniqueId: String,
     partialMatch: Boolean)
-  extends CarbonRDD[String](sc, Nil) {
+  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>
@@ -105,7 +105,7 @@ class CarbonDropPartitionCommitRDD(
     success: Boolean,
     uniqueId: String,
     partitions: Seq[String])
-  extends CarbonRDD[String](sc, Nil) {
+  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index 652720c..f327d88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -30,7 +30,7 @@ class CarbonDropTableRDD[V: ClassTag](
     valueClass: Value[V],
     databaseName: String,
     tableName: String)
-  extends CarbonRDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 7acf4e2..cf22b3d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -506,7 +506,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     table: CarbonTableIdentifier,
     dimensions: Array[CarbonDimension],
     dictFolderPath: String)
-  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
+  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil,
+    sparkContext.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
index 1087ea7..3f38300 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
@@ -41,7 +41,7 @@ class CarbonMergeFilesRDD(
     tablePath: String,
     segments: Seq[String],
     readFileFooterFromCarbonDataFile: Boolean)
-  extends CarbonRDD[String](sc, Nil) {
+  extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0d0f024..fa126fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -63,7 +63,7 @@ class CarbonMergerRDD[K, V](
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends CarbonRDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index bf46f67..6f248d2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,20 +17,26 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util._
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 /**
  * This RDD maintains session level ThreadLocal
  */
 abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
-    @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
+    @transient private var deps: Seq[Dependency[_]],
+    @transient hadoopConf: Configuration) extends RDD[T](sc, deps) {
 
   val carbonSessionInfo: CarbonSessionInfo = {
     var info = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -42,14 +48,24 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     info
   }
 
+  private val confBytes = {
+    val bao = new ByteArrayOutputStream()
+    val oos = new ObjectOutputStream(bao)
+    hadoopConf.write(oos)
+    oos.close()
+    CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
+  }
+
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient oneParent: RDD[_]) =
-    this (oneParent.context, List(new OneToOneDependency(oneParent)))
+    this (oneParent.context, List(new OneToOneDependency(oneParent)),
+      oneParent.sparkContext.hadoopConfiguration)
 
   // RDD compute logic should be here
   def internalCompute(split: Partition, context: TaskContext): Iterator[T]
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    CarbonInputFormatUtil.setS3Configurations(getConf)
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
     val carbonTaskInfo = new CarbonTaskInfo
@@ -59,6 +75,16 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
       map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
     internalCompute(split, context)
   }
+
+  private def getConf: Configuration = {
+    val configuration = new Configuration(false)
+    val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
+      .unCompressByte(confBytes))
+    val ois = new ObjectInputStream(bai)
+    configuration.readFields(ois)
+    ois.close()
+    configuration
+  }
 }
 
 /**
@@ -67,7 +93,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
 abstract class CarbonRDDWithTableInfo[T: ClassTag](
     @transient sc: SparkContext,
     @transient private var deps: Seq[Dependency[_]],
-    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps) {
+    serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) {
 
   def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
     this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index e17824f..06acbba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -182,7 +182,7 @@ class NewCarbonDataLoadRDD[K, V](
     carbonLoadModel: CarbonLoadModel,
     blocksGroupBy: Array[(String, Array[BlockDetails])],
     @transient hadoopConf: Configuration)
-  extends CarbonRDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil, hadoopConf) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 600cd80..60052f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -58,7 +58,7 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
 class DataMapPruneRDD(sc: SparkContext,
     dataMapFormat: DistributableDataMapFormat,
     resolverIntf: FilterResolverIntf)
-  extends CarbonRDD[(ExtendedBlocklet)](sc, Nil) {
+  extends CarbonRDD[(ExtendedBlocklet)](sc, Nil, sc.hadoopConfiguration) {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
index 2157799..6a97477 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -29,7 +29,8 @@ case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
 class DataLoadCoalescedRDD[T: ClassTag](
     @transient var prev: RDD[T],
     nodeList: Array[String])
-  extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
+  extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil,
+    prev.sparkContext.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
index 9befcaa..bcca7ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
 class UpdateCoalescedRDD[T: ClassTag](
     @transient var prev: RDD[T],
     nodeList: Array[String])
-  extends CarbonRDD[T](prev.context, Nil) {
+  extends CarbonRDD[T](prev.context, Nil, prev.sparkContext.hadoopConfiguration) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e502c59a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 186d100..1ecd6e4 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -94,7 +94,7 @@ class StreamHandoffRDD[K, V](
     result: HandoffResult[K, V],
     carbonLoadModel: CarbonLoadModel,
     handOffSegmentId: String
-) extends CarbonRDD[(K, V)](sc, Nil) {
+) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")