You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 18:21:59 UTC
[11/36] carbondata git commit: [CARBONDATA-2080] [S3-Implementation]
Propagated hadoopConf from driver to executor for s3 implementation in
cluster mode.
[CARBONDATA-2080] [S3-Implementation] Propagated hadoopConf from driver to executor for s3 implementation in cluster mode.
Problem : hadoopconf was not getting propagated from driver to the executor that's why load was failing to the distributed environment.
Solution: Setting the Hadoop conf in base class CarbonRDD
How to verify this PR :
Execute the load in the cluster mode It should be a success using location s3.
This closes #1860
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c74874f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c74874f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c74874f
Branch: refs/heads/carbonstore-rebase
Commit: 9c74874f069dca06e71e872fda5c2f24b4fa00da
Parents: bfa9a2c
Author: Jatin <ja...@knoldus.in>
Authored: Thu Jan 25 16:53:00 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Feb 10 02:20:10 2018 +0800
----------------------------------------------------------------------
.../spark/rdd/AlterTableAddColumnRDD.scala | 2 +-
.../spark/rdd/AlterTableDropColumnRDD.scala | 2 +-
.../spark/rdd/CarbonCleanFilesRDD.scala | 2 +-
.../spark/rdd/CarbonDeleteLoadByDateRDD.scala | 2 +-
.../spark/rdd/CarbonDeleteLoadRDD.scala | 2 +-
.../spark/rdd/CarbonDropPartitionRDD.scala | 4 +--
.../spark/rdd/CarbonDropTableRDD.scala | 2 +-
.../spark/rdd/CarbonGlobalDictionaryRDD.scala | 3 +-
.../spark/rdd/CarbonMergeFilesRDD.scala | 0
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +-
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 32 ++++++++++++++++++--
.../spark/rdd/NewCarbonDataLoadRDD.scala | 2 +-
.../carbondata/spark/rdd/SparkDataMapJob.scala | 2 +-
.../apache/spark/rdd/DataLoadCoalescedRDD.scala | 3 +-
.../apache/spark/rdd/UpdateCoalescedRDD.scala | 2 +-
.../carbondata/streaming/StreamHandoffRDD.scala | 2 +-
16 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 56a66b9..7c1edea 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -50,7 +50,7 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
class AlterTableAddColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
identifier: AbsoluteTableIdentifier)
- extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
+ extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 248f351..e14524e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -48,7 +48,7 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
class AlterTableDropColumnRDD[K, V](sc: SparkContext,
@transient newColumns: Seq[ColumnSchema],
carbonTableIdentifier: AbsoluteTableIdentifier)
- extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
+ extends CarbonRDD[(Int, SegmentStatus)](sc, Nil, sc.hadoopConfiguration) {
override def getPartitions: Array[Partition] = {
newColumns.zipWithIndex.map { column =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 32523d8..9936a2a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -33,7 +33,7 @@ class CarbonCleanFilesRDD[V: ClassTag](
databaseName: String,
tableName: String,
partitioner: Partitioner)
- extends CarbonRDD[V](sc, Nil) {
+ extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index 45271a7..b11dfad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -39,7 +39,7 @@ class CarbonDeleteLoadByDateRDD[K, V](
dimTableName: String,
storePath: String,
loadMetadataDetails: List[LoadMetadataDetails])
- extends CarbonRDD[(K, V)](sc, Nil) {
+ extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 9a1ef33..759ed42 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -34,7 +34,7 @@ class CarbonDeleteLoadRDD[V: ClassTag](
databaseName: String,
tableName: String,
partitioner: Partitioner)
- extends CarbonRDD[V](sc, Nil) {
+ extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
override def getPartitions: Array[Partition] = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 4806f9f..800cc36 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -47,7 +47,7 @@ class CarbonDropPartitionRDD(
partitions: Seq[String],
uniqueId: String,
partialMatch: Boolean)
- extends CarbonRDD[String](sc, Nil) {
+ extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
override def getPartitions: Array[Partition] = {
segments.zipWithIndex.map {s =>
@@ -105,7 +105,7 @@ class CarbonDropPartitionCommitRDD(
success: Boolean,
uniqueId: String,
partitions: Seq[String])
- extends CarbonRDD[String](sc, Nil) {
+ extends CarbonRDD[String](sc, Nil, sc.hadoopConfiguration) {
override def getPartitions: Array[Partition] = {
segments.zipWithIndex.map {s =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index 652720c..f327d88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -30,7 +30,7 @@ class CarbonDropTableRDD[V: ClassTag](
valueClass: Value[V],
databaseName: String,
tableName: String)
- extends CarbonRDD[V](sc, Nil) {
+ extends CarbonRDD[V](sc, Nil, sc.hadoopConfiguration) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 7acf4e2..cf22b3d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -506,7 +506,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
table: CarbonTableIdentifier,
dimensions: Array[CarbonDimension],
dictFolderPath: String)
- extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
+ extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil,
+ sparkContext.hadoopConfiguration) {
override def getPartitions: Array[Partition] = {
val primDimensions = dictionaryLoadModel.primDimensions
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 7eff227..1b9363d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -62,7 +62,7 @@ class CarbonMergerRDD[K, V](
carbonLoadModel: CarbonLoadModel,
carbonMergerMapping: CarbonMergerMapping,
confExecutorsTemp: String)
- extends CarbonRDD[(K, V)](sc, Nil) {
+ extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
sc.setLocalProperty("spark.job.interruptOnCancel", "true")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index bf46f67..6f248d2 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -17,20 +17,26 @@
package org.apache.carbondata.spark.rdd
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
+
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util._
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
/**
* This RDD maintains session level ThreadLocal
*/
abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
- @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
+ @transient private var deps: Seq[Dependency[_]],
+ @transient hadoopConf: Configuration) extends RDD[T](sc, deps) {
val carbonSessionInfo: CarbonSessionInfo = {
var info = ThreadLocalSessionInfo.getCarbonSessionInfo
@@ -42,14 +48,24 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
info
}
+ private val confBytes = {
+ val bao = new ByteArrayOutputStream()
+ val oos = new ObjectOutputStream(bao)
+ hadoopConf.write(oos)
+ oos.close()
+ CompressorFactory.getInstance().getCompressor.compressByte(bao.toByteArray)
+ }
+
/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
- this (oneParent.context, List(new OneToOneDependency(oneParent)))
+ this (oneParent.context, List(new OneToOneDependency(oneParent)),
+ oneParent.sparkContext.hadoopConfiguration)
// RDD compute logic should be here
def internalCompute(split: Partition, context: TaskContext): Iterator[T]
final def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ CarbonInputFormatUtil.setS3Configurations(getConf)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
val carbonTaskInfo = new CarbonTaskInfo
@@ -59,6 +75,16 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
internalCompute(split, context)
}
+
+ private def getConf: Configuration = {
+ val configuration = new Configuration(false)
+ val bai = new ByteArrayInputStream(CompressorFactory.getInstance().getCompressor
+ .unCompressByte(confBytes))
+ val ois = new ObjectInputStream(bai)
+ configuration.readFields(ois)
+ ois.close()
+ configuration
+ }
}
/**
@@ -67,7 +93,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
abstract class CarbonRDDWithTableInfo[T: ClassTag](
@transient sc: SparkContext,
@transient private var deps: Seq[Dependency[_]],
- serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps) {
+ serializedTableInfo: Array[Byte]) extends CarbonRDD[T](sc, deps, sc.hadoopConfiguration) {
def this(@transient oneParent: RDD[_], serializedTableInfo: Array[Byte]) =
this (oneParent.context, List(new OneToOneDependency(oneParent)), serializedTableInfo)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index e17824f..06acbba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -182,7 +182,7 @@ class NewCarbonDataLoadRDD[K, V](
carbonLoadModel: CarbonLoadModel,
blocksGroupBy: Array[(String, Array[BlockDetails])],
@transient hadoopConf: Configuration)
- extends CarbonRDD[(K, V)](sc, Nil) {
+ extends CarbonRDD[(K, V)](sc, Nil, hadoopConf) {
sc.setLocalProperty("spark.scheduler.pool", "DDL")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 600cd80..60052f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -58,7 +58,7 @@ class DataMapRDDPartition(rddId: Int, idx: Int, val inputSplit: InputSplit) exte
class DataMapPruneRDD(sc: SparkContext,
dataMapFormat: DistributableDataMapFormat,
resolverIntf: FilterResolverIntf)
- extends CarbonRDD[(ExtendedBlocklet)](sc, Nil) {
+ extends CarbonRDD[(ExtendedBlocklet)](sc, Nil, sc.hadoopConfiguration) {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
index 2157799..6a97477 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
@@ -29,7 +29,8 @@ case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
class DataLoadCoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
nodeList: Array[String])
- extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
+ extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil,
+ prev.sparkContext.hadoopConfiguration) {
override def getPartitions: Array[Partition] = {
new DataLoadPartitionCoalescer(prev, nodeList).run
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
index 9befcaa..bcca7ed 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/UpdateCoalescedRDD.scala
@@ -29,7 +29,7 @@ import org.apache.carbondata.spark.rdd.CarbonRDD
class UpdateCoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
nodeList: Array[String])
- extends CarbonRDD[T](prev.context, Nil) {
+ extends CarbonRDD[T](prev.context, Nil, prev.sparkContext.hadoopConfiguration) {
override def getPartitions: Array[Partition] = {
new DataLoadPartitionCoalescer(prev, nodeList).run
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c74874f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 5c6165d..eb39422 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -96,7 +96,7 @@ class StreamHandoffRDD[K, V](
result: HandoffResult[K, V],
carbonLoadModel: CarbonLoadModel,
handOffSegmentId: String
-) extends CarbonRDD[(K, V)](sc, Nil) {
+) extends CarbonRDD[(K, V)](sc, Nil, sc.hadoopConfiguration) {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")