You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/27 10:52:58 UTC

[2/5] carbondata git commit: Added set/reset commands in carbon to update/reset properties dynamically

Added set/reset commands in carbon to update/reset properties dynamically


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/95ce1da1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/95ce1da1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/95ce1da1

Branch: refs/heads/master
Commit: 95ce1da1e6a828255ca6385ae5ab16706e66483f
Parents: 28e2e17
Author: Manohar <ma...@gmail.com>
Authored: Mon Jun 12 18:06:25 2017 +0530
Committer: Manohar <ma...@gmail.com>
Committed: Tue Jun 27 14:39:51 2017 +0530

----------------------------------------------------------------------
 .../core/util/ThreadLocalSessionParams.java     | 34 +++++++++++++++
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  9 ++--
 .../spark/rdd/AlterTableDropColumnRDD.scala     | 10 ++---
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  8 +---
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  9 +---
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  9 +---
 .../spark/rdd/CarbonDropTableRDD.scala          | 12 ++---
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 34 ++++-----------
 .../spark/rdd/CarbonIUDMergerRDD.scala          |  3 --
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  8 +---
 .../apache/carbondata/spark/rdd/CarbonRDD.scala | 46 ++++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  8 +---
 .../spark/rdd/DataLoadCoalescedRDD.scala        | 15 +++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 42 +++++++-----------
 .../spark/rdd/UpdateCoalescedRDD.scala          | 10 ++---
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  4 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  7 ++-
 .../spark/sql/hive/CarbonStrategies.scala       |  4 +-
 .../execution/command/CarbonHiveCommands.scala  | 16 +------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 16 +++----
 .../sql/CarbonDatasourceHadoopRelation.scala    |  3 ++
 .../spark/sql/CarbonDictionaryDecoder.scala     |  5 ++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 12 ++++-
 .../org/apache/spark/sql/CarbonSource.scala     |  5 +--
 .../execution/CarbonLateDecodeStrategy.scala    |  3 +-
 .../execution/CastExpressionOptimization.scala  |  6 +--
 .../execution/command/CarbonHiveCommands.scala  | 26 ++++++++---
 .../sql/execution/command/DDLStrategy.scala     |  4 +-
 .../execution/command/carbonTableSchema.scala   | 16 +++----
 .../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++++---
 .../spark/sql/hive/CarbonSessionState.scala     |  2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 15 ++++---
 32 files changed, 217 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
new file mode 100644
index 0000000..354a0ee
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+/**
+ * This class maintains ThreadLocal session params
+ */
+public class ThreadLocalSessionParams {
+  static final InheritableThreadLocal<SessionParams> threadLocal =
+      new InheritableThreadLocal<SessionParams>();
+
+  public static void setSessionParams(SessionParams sessionParams) {
+    threadLocal.set(sessionParams);
+  }
+
+  public static SessionParams getSessionParams() {
+    return threadLocal.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 61e1e61..7eea95d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -50,24 +50,21 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
 class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: CarbonTableIdentifier,
-    carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
+    carbonStorePath: String)
+  extends CarbonRDD[(Int, String)](sc, Nil) {
 
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new AddColumnPartition(id, column._2, column._1)
     }.toArray
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index ba91673..fde5cd6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * This is a partitioner class for dividing the newly added columns into partitions
@@ -49,9 +48,8 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
 class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: CarbonTableIdentifier,
-    carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+    carbonStorePath: String)
+  extends CarbonRDD[(Int, String)](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
@@ -59,11 +57,9 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     }.toArray
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index c1a30b7..b63fc48 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -34,21 +33,18 @@ class CarbonCleanFilesRDD[V: ClassTag](
     databaseName: String,
     tableName: String,
     partitioner: Partitioner)
-  extends RDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
 
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[(V)] {
-      // Add the properties added in driver to executor.
-      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index f7bed59..da391cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.DeletedLoadResult
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -40,12 +39,10 @@ class CarbonDeleteLoadByDateRDD[K, V](
     dimTableName: String,
     storePath: String,
     loadMetadataDetails: List[LoadMetadataDetails])
-  extends RDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {s =>
@@ -53,10 +50,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
     }
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     new Iterator[(K, V)] {
-      // Add the properties added in driver to executor.
-      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 3ef9cef..9e43d0e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -35,11 +34,9 @@ class CarbonDeleteLoadRDD[V: ClassTag](
     databaseName: String,
     tableName: String,
     partitioner: Partitioner)
-  extends RDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil) {
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {f =>
@@ -47,10 +44,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
     }
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[V] {
-      // Add the properties added in driver to executor.
-      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index 54f8ea5..d1d49b9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -31,12 +30,10 @@ class CarbonDropTableRDD[V: ClassTag](
     valueClass: Value[V],
     databaseName: String,
     tableName: String)
-  extends RDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map { s =>
@@ -44,12 +41,9 @@ class CarbonDropTableRDD[V: ClassTag](
     }
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
 
-    val iter = new Iterator[V] {
+      val iter = new Iterator[V] {
       // TODO: Clear Btree from memory
 
       var havePair = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 434fb3c..d0f9362 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -176,19 +176,15 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
 class CarbonAllDictionaryCombineRDD(
     prev: RDD[(String, Iterable[String])],
     model: DictionaryLoadModel)
-  extends RDD[(Int, ColumnDistinctValues)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
 
   override def getPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
   }
 
-  override def compute(split: Partition, context: TaskContext
+    override def internalCompute(split: Partition, context: TaskContext
   ): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     /*
      * for all dictionary, all columns need to encoding and checking
@@ -273,17 +269,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
 class CarbonBlockDistinctValuesCombineRDD(
     prev: RDD[Row],
     model: DictionaryLoadModel)
-  extends RDD[(Int, ColumnDistinctValues)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[Row].partitions
-
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
@@ -338,16 +329,13 @@ class CarbonBlockDistinctValuesCombineRDD(
 class CarbonGlobalDictionaryGenerateRDD(
     prev: RDD[(Int, ColumnDistinctValues)],
     model: DictionaryLoadModel)
-  extends RDD[(Int, String, Boolean)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, String, Boolean)](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
-  override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
+  override def internalCompute(split: Partition,
+      context: TaskContext): Iterator[(Int, String, Boolean)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
@@ -544,9 +532,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dimensions: Array[CarbonDimension],
     hdfsLocation: String,
     dictFolderPath: String)
-  extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
 
   override def getPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions
@@ -558,10 +544,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     result
   }
 
-  override def compute(split: Partition, context: TaskContext)
+  override def internalCompute(split: Partition, context: TaskContext)
   : Iterator[(Int, ColumnDistinctValues)] = {
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
     val primDimension = theSplit.preDefDictDimension
     // read the column dict data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 38e3680..277005b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -51,8 +50,6 @@ class CarbonIUDMergerRDD[K, V](
     carbonMergerMapping,
     confExecutorsTemp) {
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index dec3ee3..908043a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -59,7 +59,7 @@ class CarbonMergerRDD[K, V](
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends RDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")
@@ -74,12 +74,8 @@ class CarbonMergerRDD[K, V](
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val iter = new Iterator[(K, V)] {
 
       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
new file mode 100644
index 0000000..e00dd0f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+
+/**
+ * This RDD maintains session level ThreadLocal
+ */
+abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
+    @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
+
+  val sessionParams: SessionParams = ThreadLocalSessionParams.getSessionParams
+
+  /** Construct an RDD with just a one-to-one dependency on one parent */
+  def this(@transient oneParent: RDD[_]) =
+    this (oneParent.context, List(new OneToOneDependency(oneParent)))
+
+  // RDD compute logic should be here
+  def internalCompute(split: Partition, context: TaskContext): Iterator[T]
+
+  final def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    internalCompute(split, context)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2c10e65..3868342 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -54,7 +54,7 @@ class CarbonScanRDD(
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     @transient carbonTable: CarbonTable)
-  extends RDD[InternalRow](sc, Nil) {
+  extends CarbonRDD[InternalRow](sc, Nil) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {
@@ -67,8 +67,6 @@ class CarbonScanRDD(
 
   private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -175,15 +173,13 @@ class CarbonScanRDD(
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
       )
     }
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
 
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
index 5da0835..b2d04ac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -21,26 +21,21 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonRDD
 
 case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
 
 class DataLoadCoalescedRDD[T: ClassTag](
-  @transient var prev: RDD[T],
-  nodeList: Array[String])
-    extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+    @transient var prev: RDD[T],
+    nodeList: Array[String])
+  extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
-
     new Iterator[DataLoadPartitionWrap[T]] {
       val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
       def hasNext = iter.hasNext

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 5790369..129c642 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -20,7 +20,6 @@ package org.apache.carbondata.spark.rdd
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
-import java.util
 import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
@@ -127,16 +126,12 @@ class SparkPartitionLoader(model: CarbonLoadModel,
 
   var storeLocation: String = ""
 
-  def initialize(addedProperies: util.Map[String, String]): Unit = {
+  def initialize(): Unit = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
     }
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
     CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
@@ -177,7 +172,7 @@ class NewCarbonDataLoadRDD[K, V](
     loadCount: Integer,
     blocksGroupBy: Array[(String, Array[BlockDetails])],
     isTableSplitPartition: Boolean)
-  extends RDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
@@ -190,8 +185,6 @@ class NewCarbonDataLoadRDD[K, V](
   private val confBroadcast =
     sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     if (isTableSplitPartition) {
       // for table split partition
@@ -222,7 +215,7 @@ class NewCarbonDataLoadRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       var partitionID = "0"
@@ -246,7 +239,7 @@ class NewCarbonDataLoadRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize(addedProperies)
+        loader.initialize()
         new DataLoadExecutor().execute(model,
           loader.storeLocation,
           recordReaders)
@@ -391,17 +384,16 @@ class NewCarbonDataLoadRDD[K, V](
  *  @see org.apache.carbondata.processing.newflow.DataLoadExecutor
  */
 class NewDataFrameLoaderRDD[K, V](
-                                   sc: SparkContext,
-                                   result: DataLoadResult[K, V],
-                                   carbonLoadModel: CarbonLoadModel,
-                                   loadCount: Integer,
-                                   tableCreationTime: Long,
-                                   schemaLastUpdatedTime: Long,
-                                   prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    loadCount: Integer,
+    tableCreationTime: Long,
+    schemaLastUpdatedTime: Long,
+    prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val partitionID = "0"
@@ -438,7 +430,7 @@ class NewDataFrameLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize(addedProperies)
+        loader.initialize()
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: BadRecordFoundException =>
@@ -593,11 +585,9 @@ class PartitionTableDataLoaderRDD[K, V](
     loadCount: Integer,
     tableCreationTime: Long,
     schemaLastUpdatedTime: Long,
-    prev: RDD[Row]) extends RDD[(K, V)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+    prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val partitionID = "0"
@@ -625,7 +615,7 @@ class PartitionTableDataLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize(addedProperies)
+        loader.initialize()
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
       } catch {
         case e: BadRecordFoundException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
index 30050f7..1025da7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
@@ -22,25 +22,21 @@ import scala.reflect.ClassTag
 import org.apache.spark._
 import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD}
 
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonRDD
 
 // This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node
 
 class UpdateCoalescedRDD[T: ClassTag](
     @transient var prev: RDD[T],
     nodeList: Array[String])
-  extends RDD[T](prev.context, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[T](prev.context, Nil) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[T] = {
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     // This iterator combines data from all the parent partitions
     new Iterator[T] {
       val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 6b94894..bcfc096 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.util
-
 import scala.collection.mutable
 
 import org.apache.spark.TaskContext
@@ -54,7 +52,7 @@ object UpdateDataLoad {
         segId,
         loadMetadataDetails)
       // Intialize to set carbon properties
-      loader.initialize(new util.HashMap)
+      loader.initialize()
 
       loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
       new DataLoadExecutor().execute(carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0e6153f..2fc93e6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, Carbon
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
+import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
 private[sql] case class CarbonDatasourceHadoopRelation(
@@ -94,7 +95,6 @@ private[sql] case class CarbonDatasourceHadoopRelation(
     requiredColumns.foreach(projection.addColumn)
     CarbonInputFormat.setColumnProjection(conf, projection)
     CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
-
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
       new SerializableConfiguration(conf),
       absIdentifier,
@@ -120,7 +120,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
   identifier: AbsoluteTableIdentifier,
   inputFormatClass: Class[_ <: CarbonInputFormat[V]],
   valueClass: Class[V])
-  extends RDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
+  extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -128,8 +128,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
   }
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
-  @DeveloperApi
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
     context: TaskContext): Iterator[V] = {
     val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
     val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 7bfd742..f0cd33b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
 import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan}
+import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
@@ -316,8 +316,6 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         } else {
           ExecutedCommand(HiveNativeCommand(sql)) :: Nil
         }
-      case set@SetCommand(kv) =>
-        ExecutedCommand(CarbonSetCommand(set)) :: Nil
       case _ =>
         Nil
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index d047b20..0f42940 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.{RunnableCommand, SetCommand}
+import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.execution.command.DropTableCommand
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 
-import org.apache.carbondata.core.util.CarbonProperties
-
 private[hive] case class CreateDatabaseCommand(dbName: String,
     command: HiveNativeCommand) extends RunnableCommand {
   def run(sqlContext: SQLContext): Seq[Row] = {
@@ -55,15 +53,3 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String,
     rows
   }
 }
-
-case class CarbonSetCommand(command: SetCommand)
-  extends RunnableCommand {
-
-  override val output = command.output
-
-  override def run(sparkSession: SQLContext): Seq[Row] = {
-    val rows = command.run(sparkSession)
-    CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1))
-    rows
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2b77654..48af516 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -128,7 +128,7 @@ object CarbonDataRDDFactory {
       isCompactionTriggerByDDl
     )
 
-    val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
           CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
         )
@@ -275,8 +275,8 @@ object CarbonDataRDDFactory {
               exception = e
           }
           // continue in case of exception also, check for all the tables.
-          val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).
-            sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
                 CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
               ).equalsIgnoreCase("true")
 
@@ -397,8 +397,8 @@ object CarbonDataRDDFactory {
         }
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
-        val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession)
-          .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
               CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
             )
             .equalsIgnoreCase("true")
@@ -1042,8 +1042,7 @@ object CarbonDataRDDFactory {
     val timeStampFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
-        .sessionParams.getProperty(CarbonCommonConstants
+      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       new SimpleDateFormat(timestampFormatString)
     }
@@ -1051,8 +1050,7 @@ object CarbonDataRDDFactory {
     val dateFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
-        .sessionParams.getProperty(CarbonCommonConstants
+      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
       new SimpleDateFormat(dateFormatString)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b0044d7..7c096d3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.processing.merger.TableMeta
@@ -52,6 +53,8 @@ case class CarbonDatasourceHadoopRelation(
       absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
     .asInstanceOf[CarbonRelation]
 
+  val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
+  ThreadLocalSessionParams.setSessionParams(sessionParams)
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 49cf54f..bd1c8b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonRDD
 
 /**
  * It decodes the data.
@@ -444,7 +445,7 @@ class CarbonDecoderRDD(
     prev: RDD[InternalRow],
     output: Seq[Attribute],
     sparkSession: SparkSession)
-  extends RDD[InternalRow](prev) {
+  extends CarbonRDD[InternalRow](prev) {
 
   private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
 
@@ -513,7 +514,7 @@ class CarbonDecoderRDD(
     dictIds
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 0851ec2..78820ea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -21,10 +21,11 @@ import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
+import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, SessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, SessionParams, ThreadLocalSessionParams}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -48,11 +49,18 @@ class CarbonEnv {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
       sessionParams = new SessionParams()
+      ThreadLocalSessionParams.setSessionParams(sessionParams)
+      val config = new CarbonSQLConf(sparkSession)
+      if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
+        config.addDefaultCarbonParams()
+      }
+      // add session params after adding DefaultCarbonParams
+      config.addDefaultCarbonSessionParams()
       carbonMetastore = {
         val storePath =
         CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sparkSession.conf, storePath, sessionParams)
+        new CarbonMetastore(sparkSession.conf, storePath)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 3079c84..1c16143 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         None)
       case _ =>
         val options = new CarbonOption(parameters)
-        val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+        val storePath = CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.STORE_LOCATION)
         val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
@@ -77,8 +77,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                           "specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
-      .getProperty(CarbonCommonConstants.STORE_LOCATION)
+    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
     val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
       .exists(tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 8d0b4ea..4605914 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -520,8 +520,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
         System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
       } else {
-        CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
-          .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
           CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
index 805a4df..a8985b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
@@ -24,7 +24,8 @@ import java.util.{Locale, TimeZone}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not}
-import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources}
+import org.apache.spark.sql.CastExpr
+import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -34,8 +35,7 @@ object CastExpressionOptimization {
 
 
   def typeCastStringToLong(v: Any): Any = {
-    val parser: SimpleDateFormat = new SimpleDateFormat(
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams
+    val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 627de02..a4feead 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand}
 
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -49,10 +49,26 @@ case class CarbonSetCommand(command: SetCommand)
   override val output = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val rows = command.run(sparkSession)
-    CarbonEnv.getInstance(sparkSession).sessionParams
-      .addProperty(rows.head.getString(0), rows.head.getString(1))
-    rows
+    val sessionParms = CarbonEnv.getInstance(sparkSession).sessionParams
+    command.kv match {
+      case Some((key, Some(value))) =>
+        val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
+        if (isCarbonProperty) {
+          sessionParms.addProperty(key, value)
+        }
+      case _ =>
+
+    }
+    command.run(sparkSession)
   }
 }
 
+case class CarbonResetCommand()
+  extends RunnableCommand {
+  override val output = ResetCommand.output
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    CarbonEnv.getInstance(sparkSession).sessionParams.clear()
+    ResetCommand.run(sparkSession)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 35be543..7d0215f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand}
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
       case set@SetCommand(kv) =>
         ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
+      case reset@ResetCommand =>
+        ExecutedCommandExec(CarbonResetCommand()) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f1fd05b..0064c21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
-    var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams
+    var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
         System.getProperty("java.io.tmpdir")
       )
@@ -359,8 +359,7 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonEnv.getInstance(sparkSession).sessionParams
-      .addProperty("zookeeper.enable.lock", "false")
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -409,7 +408,7 @@ case class LoadTable(
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
       val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams
+      val badRecordActionValue = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
       val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
@@ -429,12 +428,11 @@ case class LoadTable(
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession)
-        .sessionParams.getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams.
-        getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
         .setSerializationNullFormat(
@@ -536,7 +534,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams
+          val dictionaryServerPort = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 54cffc2..04a94ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.reader.ThriftReader
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -201,15 +201,18 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams:
     // if zookeeper is configured as carbon lock type.
     val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
     if (null != zookeeperurl) {
-      sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
     }
     if (metadataPath == null) {
       return null
     }
     // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+    if (null == CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
         FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE,
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.LOCK_TYPE,
           CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
         )
       LOGGER.info("Default lock type HDFSLOCK is configured")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 156a12e..4aef118 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -115,7 +115,7 @@ class CarbonSessionCatalog(
  */
 class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
 
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
 
   experimentalMethods.extraStrategies =
     Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 258920b..3412fb0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,30 +18,33 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext,
-TablePropertyListContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field,
-PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
- * Concrete parser for Spark SQL statements and carbon specific statements
+ * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific
+ * statements
  */
-class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
 
   val astBuilder = new CarbonSqlAstBuilder(conf)
 
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
+    val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
+    ThreadLocalSessionParams.setSessionParams(sessionParams)
     try {
       super.parsePlan(sqlText)
     } catch {