You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 08:09:02 UTC

carbondata git commit: [CARBONDATA-2990] Queries slow down after some time due to broadcast issue

Repository: carbondata
Updated Branches:
  refs/heads/master 19097f272 -> 3c7b33992


[CARBONDATA-2990] Queries slow down after some time due to broadcast issue

Problem
It is observed that during consecutive run of queries after some time queries are slowing down. This is causing the degrade in query performance.
No exception is thrown in driver and executor logs but as observed from the logs the time to broadcast hadoop conf is increasing after every query run.

Analysis

This is happening because in carbon SerializableConfiguration class is overriden from spark. Spark registers this class with Kryo serializer and hence the computation using the kryo is fast. The same benefit is not observed in carbondata becuase of overriding the class.
Internal Spark sizeEstimator calculates the size of object and there are few extra objects in carbondata overriden class because of which the computation time is increasing.
Solution
Use the spark class instead of overriding the class in carbondata

This closes #2803


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c7b3399
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c7b3399
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c7b3399

Branch: refs/heads/master
Commit: 3c7b33992e06d81fb47d81bf8ccf7884f845b3ff
Parents: 19097f2
Author: manishgupta88 <to...@gmail.com>
Authored: Mon Oct 8 19:38:54 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Oct 9 13:38:51 2018 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CsvRDDHelper.scala    |  4 +--
 .../load/DataLoadProcessBuilderOnSpark.scala    |  6 ++--
 .../load/DataLoadProcessorStepOnSpark.scala     |  6 ++--
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |  4 +--
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 33 --------------------
 .../apache/spark/sql/util/SparkSQLUtil.scala    | 21 ++++++++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  4 +--
 .../spark/sql/CarbonDictionaryDecoder.scala     |  8 ++---
 .../management/CarbonLoadDataCommand.scala      |  7 +++--
 .../command/mutation/DeleteExecution.scala      |  7 ++---
 .../command/mutation/HorizontalCompaction.scala |  8 ++---
 11 files changed, 46 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
index 8d6dd32..5511645 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -41,7 +42,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 import org.apache.carbondata.spark.util.CommonUtil
 
 object CsvRDDHelper {
@@ -110,7 +110,7 @@ object CsvRDDHelper {
     closePartition()
 
     // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(hadoopConf)
+    val serializableConfiguration = SparkSQLUtil.getSerializableConfigurableInstance(hadoopConf)
     val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
       override def apply(file: PartitionedFile): Iterator[InternalRow] = {
         new Iterator[InternalRow] {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 2e74a94..923676c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -35,7 +36,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 /**
  * Use sortBy operator in spark to load the data
@@ -66,7 +66,7 @@ object DataLoadProcessBuilderOnSpark {
     val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
     val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")
 
-    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+    val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
     // 1. Input
     val inputRDD = originRDD
       .mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
@@ -121,7 +121,7 @@ object DataLoadProcessBuilderOnSpark {
     // 4. Write
     sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
       DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
-        writeStepRowCounter, conf))
+        writeStepRowCounter, conf.value.value))
 
     // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
     // not have any functional impact as spark automatically monitors the cache usage on each node

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index f17bd91..f1a12bf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -42,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
 import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
+import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 object DataLoadProcessorStepOnSpark {
@@ -230,8 +230,8 @@ object DataLoadProcessorStepOnSpark {
       index: Int,
       modelBroadcast: Broadcast[CarbonLoadModel],
       rowCounter: Accumulator[Int],
-      conf: Broadcast[SerializableConfiguration]) {
-    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
+      conf: Configuration) {
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
     var model: CarbonLoadModel = null
     var tableName: String = null
     var rowConverter: RowConverterImpl = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index 3a02f85..83cd59c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
-import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -49,8 +48,7 @@ abstract class CarbonRDD[T: ClassTag](
 
   @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
 
-  val config: Broadcast[SerializableConfiguration] = sparkContext
-    .broadcast(new SerializableConfiguration(hadoopConf))
+  val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 87c8e4c..6076e4a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -54,39 +54,6 @@ import org.apache.carbondata.processing.util.CarbonQueryUtil
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
 
-class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
-
-  @transient
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  private def writeObject(out: ObjectOutputStream): Unit =
-    try {
-      out.defaultWriteObject()
-      value.write(out)
-    } catch {
-      case e: IOException =>
-        LOGGER.error(e, "Exception encountered")
-        throw e
-      case NonFatal(e) =>
-        LOGGER.error(e, "Exception encountered")
-        throw new IOException(e)
-    }
-
-
-  private def readObject(in: ObjectInputStream): Unit =
-    try {
-      value = new Configuration(false)
-      value.readFields(in)
-    } catch {
-      case e: IOException =>
-        LOGGER.error(e, "Exception encountered")
-        throw e
-      case NonFatal(e) =>
-        LOGGER.error(e, "Exception encountered")
-        throw new IOException(e)
-    }
-}
-
 /**
  * This partition class use to split by Host
  *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index b7d47a0..9ffe6e1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -19,11 +19,14 @@ package org.apache.spark.sql.util
 
 import java.lang.reflect.Method
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkContext
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.{SessionState, SQLConf}
-import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
+import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil}
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
@@ -99,4 +102,20 @@ object SparkSQLUtil {
       throw new UnsupportedOperationException("Spark version not supported")
     }
   }
+
+  /**
+   * Method to broadcast a variable using spark SerializableConfiguration class
+   *
+   * @param sparkContext
+   * @param hadoopConf
+   * @return
+   */
+  def broadCastHadoopConf(sparkContext: SparkContext,
+      hadoopConf: Configuration): Broadcast[SerializableConfiguration] = {
+    sparkContext.broadcast(getSerializableConfigurableInstance(hadoopConf))
+  }
+
+  def getSerializableConfigurableInstance(hadoopConf: Configuration): SerializableConfiguration = {
+    new SerializableConfiguration(hadoopConf)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6350b50..0ec3bc6 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -728,7 +728,7 @@ object CarbonDataRDDFactory {
 
       // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
       // so segmentIdIndex=partitionId/parallelism, this has been verified.
-      val conf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+      val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf)
       partitionByRdd.map(_._2).mapPartitions { partition =>
         ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
         val partitionId = TaskContext.getPartitionId()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index d0ed56e..ff7ac60 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.optimizer.CarbonDecoderRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
@@ -44,7 +44,7 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil
 import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
-import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration}
+import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo
 
 /**
  * It decodes the data.
@@ -76,8 +76,8 @@ case class CarbonDictionaryDecoder(
         (carbonTable.getTableName, carbonTable)
       }.toMap
 
-      val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
-        .sessionState.newHadoopConf()))
+      val conf = SparkSQLUtil
+        .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
       if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
         val dataTypes = child.output.map { attr => attr.dataType }
         child.execute().mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f7a5f42..43c8b86 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSou
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -78,7 +79,7 @@ import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
@@ -986,8 +987,8 @@ case class CarbonLoadDataCommand(
           array
         }
       }
-    val conf = sparkSession.sparkContext
-      .broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()))
+    val conf = SparkSQLUtil
+      .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
     val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 4921b33..7e7f671 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -49,7 +50,6 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
@@ -120,9 +120,8 @@ object DeleteExecution {
         blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
         keyRdd.partitions.length)
 
-    val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
-      .sessionState.newHadoopConf()))
-
+    val conf = SparkSQLUtil
+      .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
     val rdd = rowContRdd.join(keyRdd)
     res = rdd.mapPartitionsWithIndex(
       (index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c7b3399/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 66066ed..35fc3c3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -26,16 +26,16 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
-import org.apache.carbondata.core.util.{ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
 
 object HorizontalCompaction {
 
@@ -191,8 +191,8 @@ object HorizontalCompaction {
 
       val timestamp = factTimeStamp
       val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
-      val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
-        .sessionState.newHadoopConf()))
+      val conf = SparkSQLUtil
+        .broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
       val result = rdd1.mapPartitions(iter =>
         new Iterator[Seq[CarbonDataMergerUtilResult]] {
           ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)