You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/27 10:52:57 UTC

[1/5] carbondata git commit: Adding session based properties

Repository: carbondata
Updated Branches:
  refs/heads/master 2234ec8b1 -> 88522912b


Adding session based properties

Added set command in carbon to update properties dynamically


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/28e2e171
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/28e2e171
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/28e2e171

Branch: refs/heads/master
Commit: 28e2e171db578e2467be55939d2da9a5f1b70d09
Parents: 2234ec8
Author: ravipesala <ra...@gmail.com>
Authored: Thu May 18 15:04:17 2017 +0530
Committer: Manohar <ma...@gmail.com>
Committed: Tue Jun 27 14:39:50 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/util/CarbonProperties.java  | 23 +++++++
 .../carbondata/core/util/SessionParams.java     | 70 ++++++++++++++++++++
 .../testsuite/commands/SetCommandTestCase.scala | 34 ++++++++++
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  4 ++
 .../spark/rdd/AlterTableDropColumnRDD.scala     |  5 ++
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  5 ++
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  5 ++
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  5 ++
 .../spark/rdd/CarbonDropTableRDD.scala          |  6 ++
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 17 ++++-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |  3 +
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  4 ++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  4 ++
 .../spark/rdd/DataLoadCoalescedRDD.scala        |  6 ++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 17 +++--
 .../spark/rdd/UpdateCoalescedRDD.scala          |  7 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  4 +-
 .../spark/sql/hive/CarbonStrategies.scala       |  4 +-
 .../execution/command/CarbonHiveCommands.scala  | 16 ++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 16 +++--
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  9 ++-
 .../org/apache/spark/sql/CarbonSource.scala     |  5 +-
 .../execution/CarbonLateDecodeStrategy.scala    |  3 +-
 .../execution/CastExpressionOptimization.scala  |  6 +-
 .../execution/command/CarbonHiveCommands.scala  | 18 ++++-
 .../sql/execution/command/DDLStrategy.scala     | 10 +--
 .../execution/command/carbonTableSchema.scala   | 16 +++--
 .../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++---
 28 files changed, 290 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 90c6ffa..0142e38 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -47,6 +49,11 @@ public final class CarbonProperties {
   private Properties carbonProperties;
 
   /**
+   * Added properties on the fly.
+   */
+  private Map<String, String> setProperties = new HashMap<>();
+
+  /**
    * Private constructor this will call load properties method to load all the
    * carbon properties in memory.
    */
@@ -447,10 +454,26 @@ public final class CarbonProperties {
    * @return properties value
    */
   public CarbonProperties addProperty(String key, String value) {
+    setProperties.put(key, value);
     carbonProperties.setProperty(key, value);
     return this;
   }
 
+  /**
+   * Get all the added properties.
+   * @return
+   */
+  public Map<String, String> getAddedProperies() {
+    return setProperties;
+  }
+
+  public void setProperties(Map<String, String> newProperties) {
+    setProperties.putAll(newProperties);
+    for (Map.Entry<String, String> entry : newProperties.entrySet()) {
+      carbonProperties.setProperty(entry.getKey(), entry.getValue());
+    }
+  }
+
   private ColumnarFormatVersion getDefaultFormatVersion() {
     return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
new file mode 100644
index 0000000..781b898
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -0,0 +1,70 @@
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by root1 on 19/5/17.
+ */
+public class SessionParams implements Serializable {
+
+  protected transient CarbonProperties properties;
+
+  private Map<String, String> sProps;
+
+  public SessionParams() {
+    sProps = new HashMap<>();
+    properties = CarbonProperties.getInstance();
+  }
+
+  public SessionParams(SessionParams sessionParams) {
+    this();
+    sProps.putAll(sessionParams.sProps);
+  }
+
+  /**
+   * This method will be used to get the properties value
+   *
+   * @param key
+   * @return properties value
+   */
+  public String getProperty(String key) {
+    String s = sProps.get(key);
+    if (key == null) {
+      s = properties.getProperty(key);
+    }
+    return s;
+  }
+
+  /**
+   * This method will be used to get the properties value if property is not
+   * present then it will return tghe default value
+   *
+   * @param key
+   * @return properties value
+   */
+  public String getProperty(String key, String defaultValue) {
+    String value = sProps.get(key);
+    if (key == null) {
+      value = properties.getProperty(key, defaultValue);
+    }
+    return value;
+  }
+
+  /**
+   * This method will be used to add a new property
+   *
+   * @param key
+   * @return properties value
+   */
+  public SessionParams addProperty(String key, String value) {
+    sProps.put(key, value);
+    return this;
+  }
+
+  public void setProperties(Map<String, String> newProperties) {
+    sProps.putAll(newProperties);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
new file mode 100644
index 0000000..28e2dbf
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.commands
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.util.CarbonProperties
+
+class SetCommandTestCase  extends QueryTest with BeforeAndAfterAll {
+
+  test("test set command") {
+
+    sql("set key1=value1")
+
+    assert(CarbonProperties.getInstance().getProperty("key1").equals("value1"), "Set command does not work" )
+    assert(sqlContext.getConf("key1").equals("value1"), "Set command does not work" )
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index d81ed30..61e1e61 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -55,6 +55,8 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new AddColumnPartition(id, column._2, column._1)
@@ -64,6 +66,8 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index 53796bb..ba91673 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * This is a partitioner class for dividing the newly added columns into partitions
@@ -50,6 +51,8 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     carbonTableIdentifier: CarbonTableIdentifier,
     carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new DropColumnPartition(id, column._2, column._1)
@@ -59,6 +62,8 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index 9cc46c1..c1a30b7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -37,6 +38,8 @@ class CarbonCleanFilesRDD[V: ClassTag](
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
@@ -44,6 +47,8 @@ class CarbonCleanFilesRDD[V: ClassTag](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[(V)] {
+      // Add the properties added in driver to executor.
+      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index f9a7bdd..f7bed59 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.DeletedLoadResult
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -43,6 +44,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {s =>
@@ -52,6 +55,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     new Iterator[(K, V)] {
+      // Add the properties added in driver to executor.
+      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 26e1abc..3ef9cef 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -37,6 +38,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
   extends RDD[V](sc, Nil) {
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {f =>
@@ -46,6 +49,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[V] {
+      // Add the properties added in driver to executor.
+      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index dc63098..54f8ea5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -22,6 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -34,6 +35,8 @@ class CarbonDropTableRDD[V: ClassTag](
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map { s =>
@@ -43,6 +46,9 @@ class CarbonDropTableRDD[V: ClassTag](
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
 
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
+
     val iter = new Iterator[V] {
       // TODO: Clear Btree from memory
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 1e33188..434fb3c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -178,6 +178,8 @@ class CarbonAllDictionaryCombineRDD(
     model: DictionaryLoadModel)
   extends RDD[(Int, ColumnDistinctValues)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
   }
@@ -185,7 +187,8 @@ class CarbonAllDictionaryCombineRDD(
   override def compute(split: Partition, context: TaskContext
   ): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     /*
      * for all dictionary, all columns need to encoding and checking
@@ -272,11 +275,15 @@ class CarbonBlockDistinctValuesCombineRDD(
     model: DictionaryLoadModel)
   extends RDD[(Int, ColumnDistinctValues)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = firstParent[Row].partitions
 
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
@@ -333,10 +340,14 @@ class CarbonGlobalDictionaryGenerateRDD(
     model: DictionaryLoadModel)
   extends RDD[(Int, String, Boolean)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
@@ -535,6 +546,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dictFolderPath: String)
   extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions
     val primDimLength = primDimensions.length
@@ -547,6 +560,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
 
   override def compute(split: Partition, context: TaskContext)
   : Iterator[(Int, ColumnDistinctValues)] = {
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
     val primDimension = theSplit.preDefDictDimension
     // read the column dict data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 277005b..38e3680 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -50,6 +51,8 @@ class CarbonIUDMergerRDD[K, V](
     carbonMergerMapping,
     confExecutorsTemp) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index caa389a..dec3ee3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -74,8 +74,12 @@ class CarbonMergerRDD[K, V](
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     val iter = new Iterator[(K, V)] {
 
       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 4807b90..2c10e65 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -67,6 +67,8 @@ class CarbonScanRDD(
 
   private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -180,6 +182,8 @@ class CarbonScanRDD(
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
       )
     }
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
 
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
index 7395e43..5da0835 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -21,6 +21,8 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
 
 class DataLoadCoalescedRDD[T: ClassTag](
@@ -28,12 +30,16 @@ class DataLoadCoalescedRDD[T: ClassTag](
   nodeList: Array[String])
     extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
   override def compute(split: Partition,
       context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
 
     new Iterator[DataLoadPartitionWrap[T]] {
       val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 058c5c6..5790369 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -20,6 +20,7 @@ package org.apache.carbondata.spark.rdd
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
+import java.util
 import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
@@ -126,12 +127,16 @@ class SparkPartitionLoader(model: CarbonLoadModel,
 
   var storeLocation: String = ""
 
-  def initialize(): Unit = {
+  def initialize(addedProperies: util.Map[String, String]): Unit = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
     }
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
     CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
@@ -185,6 +190,8 @@ class NewCarbonDataLoadRDD[K, V](
   private val confBroadcast =
     sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     if (isTableSplitPartition) {
       // for table split partition
@@ -239,7 +246,7 @@ class NewCarbonDataLoadRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize()
+        loader.initialize(addedProperies)
         new DataLoadExecutor().execute(model,
           loader.storeLocation,
           recordReaders)
@@ -392,6 +399,7 @@ class NewDataFrameLoaderRDD[K, V](
                                    schemaLastUpdatedTime: Long,
                                    prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -430,7 +438,7 @@ class NewDataFrameLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize()
+        loader.initialize(addedProperies)
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: BadRecordFoundException =>
@@ -587,6 +595,7 @@ class PartitionTableDataLoaderRDD[K, V](
     schemaLastUpdatedTime: Long,
     prev: RDD[Row]) extends RDD[(K, V)](prev) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
 
   override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
@@ -616,7 +625,7 @@ class PartitionTableDataLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize()
+        loader.initialize(addedProperies)
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
       } catch {
         case e: BadRecordFoundException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
index 67e094a..30050f7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
@@ -22,6 +22,8 @@ import scala.reflect.ClassTag
 import org.apache.spark._
 import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD}
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 // This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node
 
 class UpdateCoalescedRDD[T: ClassTag](
@@ -29,13 +31,16 @@ class UpdateCoalescedRDD[T: ClassTag](
     nodeList: Array[String])
   extends RDD[T](prev.context, Nil) {
 
+  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
   override def compute(split: Partition,
       context: TaskContext): Iterator[T] = {
-
+    // Add the properties added in driver to executor.
+    CarbonProperties.getInstance().setProperties(addedProperies)
     // This iterator combines data from all the parent partitions
     new Iterator[T] {
       val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index bcfc096..6b94894 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.util
+
 import scala.collection.mutable
 
 import org.apache.spark.TaskContext
@@ -52,7 +54,7 @@ object UpdateDataLoad {
         segId,
         loadMetadataDetails)
       // Intialize to set carbon properties
-      loader.initialize()
+      loader.initialize(new util.HashMap)
 
       loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
       new DataLoadExecutor().execute(carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index f0cd33b..7bfd742 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
 import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
+import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
@@ -316,6 +316,8 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         } else {
           ExecutedCommand(HiveNativeCommand(sql)) :: Nil
         }
+      case set@SetCommand(kv) =>
+        ExecutedCommand(CarbonSetCommand(set)) :: Nil
       case _ =>
         Nil
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 0f42940..d047b20 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.{RunnableCommand, SetCommand}
 import org.apache.spark.sql.execution.command.DropTableCommand
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 
+import org.apache.carbondata.core.util.CarbonProperties
+
 private[hive] case class CreateDatabaseCommand(dbName: String,
     command: HiveNativeCommand) extends RunnableCommand {
   def run(sqlContext: SQLContext): Seq[Row] = {
@@ -53,3 +55,15 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String,
     rows
   }
 }
+
+case class CarbonSetCommand(command: SetCommand)
+  extends RunnableCommand {
+
+  override val output = command.output
+
+  override def run(sparkSession: SQLContext): Seq[Row] = {
+    val rows = command.run(sparkSession)
+    CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1))
+    rows
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 48af516..2b77654 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -128,7 +128,7 @@ object CarbonDataRDDFactory {
       isCompactionTriggerByDDl
     )
 
-    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+    val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
         .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
           CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
         )
@@ -275,8 +275,8 @@ object CarbonDataRDDFactory {
               exception = e
           }
           // continue in case of exception also, check for all the tables.
-          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).
+            sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
                 CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
               ).equalsIgnoreCase("true")
 
@@ -397,8 +397,8 @@ object CarbonDataRDDFactory {
         }
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
-        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession)
+          .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
               CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
             )
             .equalsIgnoreCase("true")
@@ -1042,7 +1042,8 @@ object CarbonDataRDDFactory {
     val timeStampFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+      val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
+        .sessionParams.getProperty(CarbonCommonConstants
         .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       new SimpleDateFormat(timestampFormatString)
     }
@@ -1050,7 +1051,8 @@ object CarbonDataRDDFactory {
     val dateFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+      val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
+        .sessionParams.getProperty(CarbonCommonConstants
         .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
       new SimpleDateFormat(dateFormatString)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index b46488c..0851ec2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, SessionParams}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -35,6 +35,8 @@ class CarbonEnv {
 
   var carbonMetastore: CarbonMetastore = _
 
+  var sessionParams: SessionParams = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -45,11 +47,12 @@ class CarbonEnv {
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
+      sessionParams = new SessionParams()
       carbonMetastore = {
         val storePath =
-          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sparkSession.conf, storePath)
+        new CarbonMetastore(sparkSession.conf, storePath, sessionParams)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 1c16143..3079c84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         None)
       case _ =>
         val options = new CarbonOption(parameters)
-        val storePath = CarbonProperties.getInstance()
+        val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
           .getProperty(CarbonCommonConstants.STORE_LOCATION)
         val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
@@ -77,7 +77,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                           "specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
+    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+      .getProperty(CarbonCommonConstants.STORE_LOCATION)
     val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
       .exists(tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 4605914..8d0b4ea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -520,7 +520,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
         System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
       } else {
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+          .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
           CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
index a8985b9..805a4df 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
@@ -24,8 +24,7 @@ import java.util.{Locale, TimeZone}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not}
-import org.apache.spark.sql.CastExpr
-import org.apache.spark.sql.sources
+import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources}
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -35,7 +34,8 @@ object CastExpressionOptimization {
 
 
   def typeCastStringToLong(v: Any): Any = {
-    val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance
+    val parser: SimpleDateFormat = new SimpleDateFormat(
+      CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams
       .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index b72f077..627de02 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,7 +18,9 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand}
+import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand}
+
+import org.apache.carbondata.core.util.CarbonProperties
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {
@@ -40,3 +42,17 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     rows
   }
 }
+
+case class CarbonSetCommand(command: SetCommand)
+  extends RunnableCommand {
+
+  override val output = command.output
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val rows = command.run(sparkSession)
+    CarbonEnv.getInstance(sparkSession).sessionParams
+      .addProperty(rows.head.getString(0), rows.head.getString(1))
+    rows
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 3593b6d..35be543 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -16,13 +16,12 @@
  */
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable,
-ShowLoadsCommand, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, InsertIntoCarbonTable, ShowLoadsCommand, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.hive.execution.command.CarbonDropDatabaseCommand
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand}
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -110,13 +109,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           throw new MalformedCarbonCommandException("Unsupported alter operation on hive table")
         }
       case desc@DescribeTableCommand(identifier, partitionSpec, isExtended, isFormatted)
-        if
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
+        if CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(identifier)(sparkSession) && isFormatted =>
         val resolvedTable =
           sparkSession.sessionState.executePlan(UnresolvedRelation(identifier, None)).analyzed
         val resultPlan = sparkSession.sessionState.executePlan(resolvedTable).executedPlan
         ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
+      case set@SetCommand(kv) =>
+        ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0064c21..f1fd05b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
-    var storeLocation = CarbonProperties.getInstance
+    var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
         System.getProperty("java.io.tmpdir")
       )
@@ -359,7 +359,8 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    CarbonEnv.getInstance(sparkSession).sessionParams
+      .addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -408,7 +409,7 @@ case class LoadTable(
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
       val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonProperties.getInstance()
+      val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
       val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
@@ -428,11 +429,12 @@ case class LoadTable(
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession)
+        .sessionParams.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
-        CarbonCommonConstants.CARBON_DATE_FORMAT,
+      carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams.
+        getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
         .setSerializationNullFormat(
@@ -534,7 +536,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonProperties.getInstance()
+          val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28e2e171/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 04a94ce..54cffc2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark.sql.{RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.reader.ThriftReader
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -201,18 +201,15 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
     // if zookeeper is configured as carbon lock type.
     val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
     if (null != zookeeperurl) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+      sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
     }
     if (metadataPath == null) {
       return null
     }
     // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+    if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) &&
         FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      CarbonProperties.getInstance
-        .addProperty(CarbonCommonConstants.LOCK_TYPE,
+      sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE,
           CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
         )
       LOGGER.info("Default lock type HDFSLOCK is configured")


[5/5] carbondata git commit: [CARBONDATA-1065] Added set command in carbon to update properties dynamically. This closes #972

Posted by gv...@apache.org.
[CARBONDATA-1065] Added set command in carbon to update properties dynamically. This closes #972


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/88522912
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/88522912
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/88522912

Branch: refs/heads/master
Commit: 88522912ba3248c349c7585c2ed75f70604972ef
Parents: 2234ec8 39644b5
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Tue Jun 27 16:21:22 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jun 27 16:21:22 2017 +0530

----------------------------------------------------------------------
 .../common/constants/LoggerAction.java          |  38 +++++
 .../core/constants/CarbonCommonConstants.java   | 100 ++++++++++-
 .../constants/CarbonLoadOptionConstants.java    |  88 ++++++++++
 .../constants/CarbonV3DataFormatConstants.java  |   5 +
 .../InvalidConfigurationException.java          |  87 ++++++++++
 .../carbondata/core/util/CarbonProperties.java  |  76 +++++++--
 .../carbondata/core/util/CarbonProperty.java    |  28 ++++
 .../carbondata/core/util/CarbonSessionInfo.java |  38 +++++
 .../apache/carbondata/core/util/CarbonUtil.java |  74 ++++++++-
 .../carbondata/core/util/SessionParams.java     | 141 ++++++++++++++++
 .../core/util/ThreadLocalSessionInfo.java       |  34 ++++
 .../hadoop/ft/CarbonInputMapperTest.java        |   5 +
 .../carbondata/hadoop/ft/InputFilesTest.java    |   5 +
 .../dataload/TestGlobalSortDataLoad.scala       |   6 +-
 .../TestLoadDataWithDiffTimestampFormat.scala   |   4 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   8 +-
 .../spark/rdd/AlterTableAddColumnRDD.scala      |   5 +-
 .../spark/rdd/AlterTableDropColumnRDD.scala     |   5 +-
 .../spark/rdd/CarbonCleanFilesRDD.scala         |   5 +-
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |   4 +-
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |   4 +-
 .../spark/rdd/CarbonDropTableRDD.scala          |   6 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  19 +--
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   4 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |  46 ++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   4 +-
 .../spark/rdd/DataLoadCoalescedRDD.scala        |  11 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  25 ++-
 .../spark/rdd/UpdateCoalescedRDD.scala          |   7 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   4 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   7 +-
 .../execution/command/carbonTableSchema.scala   |  19 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   3 +
 .../spark/sql/CarbonDictionaryDecoder.scala     |   5 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  19 ++-
 .../execution/command/CarbonHiveCommands.scala  |  34 +++-
 .../sql/execution/command/DDLStrategy.scala     |  12 +-
 .../execution/command/carbonTableSchema.scala   |  68 +++++---
 .../spark/sql/hive/CarbonSessionState.scala     |   2 +-
 .../spark/sql/internal/CarbonSqlConf.scala      | 144 ++++++++++++++++
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  15 +-
 .../BadRecordPathLoadOptionTest.scala           |  87 ++++++++++
 .../DataLoadFailAllTypeSortTest.scala           |   1 -
 .../commands/SetCommandTestCase.scala           | 165 +++++++++++++++++++
 .../processing/constants/LoggerAction.java      |  38 -----
 .../processing/model/CarbonLoadModel.java       |  14 ++
 .../newflow/DataLoadProcessBuilder.java         |   3 +
 .../newflow/sort/SortScopeOptions.java          |  17 +-
 .../steps/DataConverterProcessorStepImpl.java   |  25 ++-
 ...ConverterProcessorWithBucketingStepImpl.java |  23 ++-
 .../util/CarbonDataProcessorUtil.java           |  16 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |   3 +-
 55 files changed, 1404 insertions(+), 207 deletions(-)
----------------------------------------------------------------------



[2/5] carbondata git commit: Added set/reset commands in carbon to update/reset properties dynamically

Posted by gv...@apache.org.
Added set/reset commands in carbon to update/reset properties dynamically


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/95ce1da1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/95ce1da1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/95ce1da1

Branch: refs/heads/master
Commit: 95ce1da1e6a828255ca6385ae5ab16706e66483f
Parents: 28e2e17
Author: Manohar <ma...@gmail.com>
Authored: Mon Jun 12 18:06:25 2017 +0530
Committer: Manohar <ma...@gmail.com>
Committed: Tue Jun 27 14:39:51 2017 +0530

----------------------------------------------------------------------
 .../core/util/ThreadLocalSessionParams.java     | 34 +++++++++++++++
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  9 ++--
 .../spark/rdd/AlterTableDropColumnRDD.scala     | 10 ++---
 .../spark/rdd/CarbonCleanFilesRDD.scala         |  8 +---
 .../spark/rdd/CarbonDeleteLoadByDateRDD.scala   |  9 +---
 .../spark/rdd/CarbonDeleteLoadRDD.scala         |  9 +---
 .../spark/rdd/CarbonDropTableRDD.scala          | 12 ++---
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   | 34 ++++-----------
 .../spark/rdd/CarbonIUDMergerRDD.scala          |  3 --
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  8 +---
 .../apache/carbondata/spark/rdd/CarbonRDD.scala | 46 ++++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  8 +---
 .../spark/rdd/DataLoadCoalescedRDD.scala        | 15 +++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 42 +++++++-----------
 .../spark/rdd/UpdateCoalescedRDD.scala          | 10 ++---
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  4 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  7 ++-
 .../spark/sql/hive/CarbonStrategies.scala       |  4 +-
 .../execution/command/CarbonHiveCommands.scala  | 16 +------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 16 +++----
 .../sql/CarbonDatasourceHadoopRelation.scala    |  3 ++
 .../spark/sql/CarbonDictionaryDecoder.scala     |  5 ++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 12 ++++-
 .../org/apache/spark/sql/CarbonSource.scala     |  5 +--
 .../execution/CarbonLateDecodeStrategy.scala    |  3 +-
 .../execution/CastExpressionOptimization.scala  |  6 +--
 .../execution/command/CarbonHiveCommands.scala  | 26 ++++++++---
 .../sql/execution/command/DDLStrategy.scala     |  4 +-
 .../execution/command/carbonTableSchema.scala   | 16 +++----
 .../apache/spark/sql/hive/CarbonMetastore.scala | 15 ++++---
 .../spark/sql/hive/CarbonSessionState.scala     |  2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala | 15 ++++---
 32 files changed, 217 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
new file mode 100644
index 0000000..354a0ee
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+/**
+ * This class maintains ThreadLocal session params
+ */
+public class ThreadLocalSessionParams {
+  static final InheritableThreadLocal<SessionParams> threadLocal =
+      new InheritableThreadLocal<SessionParams>();
+
+  public static void setSessionParams(SessionParams sessionParams) {
+    threadLocal.set(sessionParams);
+  }
+
+  public static SessionParams getSessionParams() {
+    return threadLocal.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 61e1e61..7eea95d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -50,24 +50,21 @@ class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Par
 class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: CarbonTableIdentifier,
-    carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
+    carbonStorePath: String)
+  extends CarbonRDD[(Int, String)](sc, Nil) {
 
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
       new AddColumnPartition(id, column._2, column._1)
     }.toArray
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index ba91673..fde5cd6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * This is a partitioner class for dividing the newly added columns into partitions
@@ -49,9 +48,8 @@ class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Pa
 class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: CarbonTableIdentifier,
-    carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+    carbonStorePath: String)
+  extends CarbonRDD[(Int, String)](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
@@ -59,11 +57,9 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     }.toArray
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, String)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val iter = new Iterator[(Int, String)] {
       try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
index c1a30b7..b63fc48 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -34,21 +33,18 @@ class CarbonCleanFilesRDD[V: ClassTag](
     databaseName: String,
     tableName: String,
     partitioner: Partitioner)
-  extends RDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
 
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[(V)] {
-      // Add the properties added in driver to executor.
-      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
index f7bed59..da391cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.rdd.RDD
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.DeletedLoadResult
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -40,12 +39,10 @@ class CarbonDeleteLoadByDateRDD[K, V](
     dimTableName: String,
     storePath: String,
     loadMetadataDetails: List[LoadMetadataDetails])
-  extends RDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {s =>
@@ -53,10 +50,8 @@ class CarbonDeleteLoadByDateRDD[K, V](
     }
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     new Iterator[(K, V)] {
-      // Add the properties added in driver to executor.
-      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
index 3ef9cef..9e43d0e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
@@ -24,7 +24,6 @@ import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -35,11 +34,9 @@ class CarbonDeleteLoadRDD[V: ClassTag](
     databaseName: String,
     tableName: String,
     partitioner: Partitioner)
-  extends RDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil) {
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map {f =>
@@ -47,10 +44,8 @@ class CarbonDeleteLoadRDD[V: ClassTag](
     }
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
     val iter = new Iterator[V] {
-      // Add the properties added in driver to executor.
-      CarbonProperties.getInstance().setProperties(addedProperies)
       val split = theSplit.asInstanceOf[CarbonLoadPartition]
       logInfo("Input split: " + split.serializableHadoopSplit.value)
       // TODO call CARBON delete API

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
index 54f8ea5..d1d49b9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
@@ -22,7 +22,6 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.Value
 import org.apache.carbondata.spark.util.CarbonQueryUtil
 
@@ -31,12 +30,10 @@ class CarbonDropTableRDD[V: ClassTag](
     valueClass: Value[V],
     databaseName: String,
     tableName: String)
-  extends RDD[V](sc, Nil) {
+  extends CarbonRDD[V](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
     splits.zipWithIndex.map { s =>
@@ -44,12 +41,9 @@ class CarbonDropTableRDD[V: ClassTag](
     }
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[V] = {
 
-    val iter = new Iterator[V] {
+      val iter = new Iterator[V] {
       // TODO: Clear Btree from memory
 
       var havePair = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 434fb3c..d0f9362 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -176,19 +176,15 @@ case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends S
 class CarbonAllDictionaryCombineRDD(
     prev: RDD[(String, Iterable[String])],
     model: DictionaryLoadModel)
-  extends RDD[(Int, ColumnDistinctValues)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
 
   override def getPartitions: Array[Partition] = {
     firstParent[(String, Iterable[String])].partitions
   }
 
-  override def compute(split: Partition, context: TaskContext
+    override def internalCompute(split: Partition, context: TaskContext
   ): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val distinctValuesList = new ArrayBuffer[(Int, mutable.HashSet[String])]
     /*
      * for all dictionary, all columns need to encoding and checking
@@ -273,17 +269,12 @@ class StringArrayRow(var values: Array[String]) extends Row {
 class CarbonBlockDistinctValuesCombineRDD(
     prev: RDD[Row],
     model: DictionaryLoadModel)
-  extends RDD[(Int, ColumnDistinctValues)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, ColumnDistinctValues)](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[Row].partitions
-
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
@@ -338,16 +329,13 @@ class CarbonBlockDistinctValuesCombineRDD(
 class CarbonGlobalDictionaryGenerateRDD(
     prev: RDD[(Int, ColumnDistinctValues)],
     model: DictionaryLoadModel)
-  extends RDD[(Int, String, Boolean)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, String, Boolean)](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
-  override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
+  override def internalCompute(split: Partition,
+      context: TaskContext): Iterator[(Int, String, Boolean)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
     val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
@@ -544,9 +532,7 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     dimensions: Array[CarbonDimension],
     hdfsLocation: String,
     dictFolderPath: String)
-  extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
 
   override def getPartitions: Array[Partition] = {
     val primDimensions = dictionaryLoadModel.primDimensions
@@ -558,10 +544,8 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     result
   }
 
-  override def compute(split: Partition, context: TaskContext)
+  override def internalCompute(split: Partition, context: TaskContext)
   : Iterator[(Int, ColumnDistinctValues)] = {
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
     val primDimension = theSplit.preDefDictDimension
     // read the column dict data

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 38e3680..277005b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -51,8 +50,6 @@ class CarbonIUDMergerRDD[K, V](
     carbonMergerMapping,
     confExecutorsTemp) {
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
     val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index dec3ee3..908043a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -59,7 +59,7 @@ class CarbonMergerRDD[K, V](
     carbonLoadModel: CarbonLoadModel,
     carbonMergerMapping: CarbonMergerMapping,
     confExecutorsTemp: String)
-  extends RDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
   sc.setLocalProperty("spark.job.interruptOnCancel", "true")
@@ -74,12 +74,8 @@ class CarbonMergerRDD[K, V](
   val factTableName = carbonMergerMapping.factTableName
   val tableId = carbonMergerMapping.tableId
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     val iter = new Iterator[(K, V)] {
 
       carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
new file mode 100644
index 0000000..e00dd0f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+
+/**
+ * This RDD maintains session level ThreadLocal
+ */
+abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
+    @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
+
+  val sessionParams: SessionParams = ThreadLocalSessionParams.getSessionParams
+
+  /** Construct an RDD with just a one-to-one dependency on one parent */
+  def this(@transient oneParent: RDD[_]) =
+    this (oneParent.context, List(new OneToOneDependency(oneParent)))
+
+  // RDD compute logic should be here
+  def internalCompute(split: Partition, context: TaskContext): Iterator[T]
+
+  final def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    internalCompute(split, context)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 2c10e65..3868342 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -54,7 +54,7 @@ class CarbonScanRDD(
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     @transient carbonTable: CarbonTable)
-  extends RDD[InternalRow](sc, Nil) {
+  extends CarbonRDD[InternalRow](sc, Nil) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   private val jobTrackerId: String = {
@@ -67,8 +67,6 @@ class CarbonScanRDD(
 
   private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName)
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   @transient private val jobId = new JobID(jobTrackerId, id)
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
@@ -175,15 +173,13 @@ class CarbonScanRDD(
     result.toArray(new Array[Partition](result.size()))
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties"
       )
     }
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
 
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
index 5da0835..b2d04ac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadCoalescedRDD.scala
@@ -21,26 +21,21 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonRDD
 
 case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
 
 class DataLoadCoalescedRDD[T: ClassTag](
-  @transient var prev: RDD[T],
-  nodeList: Array[String])
-    extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+    @transient var prev: RDD[T],
+    nodeList: Array[String])
+  extends CarbonRDD[DataLoadPartitionWrap[T]](prev.context, Nil) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
-
     new Iterator[DataLoadPartitionWrap[T]] {
       val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
       def hasNext = iter.hasNext

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 5790369..129c642 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -20,7 +20,6 @@ package org.apache.carbondata.spark.rdd
 import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
 import java.nio.ByteBuffer
 import java.text.SimpleDateFormat
-import java.util
 import java.util.{Date, UUID}
 
 import scala.collection.JavaConverters._
@@ -127,16 +126,12 @@ class SparkPartitionLoader(model: CarbonLoadModel,
 
   var storeLocation: String = ""
 
-  def initialize(addedProperies: util.Map[String, String]): Unit = {
+  def initialize(): Unit = {
     val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
     }
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
     CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
@@ -177,7 +172,7 @@ class NewCarbonDataLoadRDD[K, V](
     loadCount: Integer,
     blocksGroupBy: Array[(String, Array[BlockDetails])],
     isTableSplitPartition: Boolean)
-  extends RDD[(K, V)](sc, Nil) {
+  extends CarbonRDD[(K, V)](sc, Nil) {
 
   sc.setLocalProperty("spark.scheduler.pool", "DDL")
 
@@ -190,8 +185,6 @@ class NewCarbonDataLoadRDD[K, V](
   private val confBroadcast =
     sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
-
   override def getPartitions: Array[Partition] = {
     if (isTableSplitPartition) {
       // for table split partition
@@ -222,7 +215,7 @@ class NewCarbonDataLoadRDD[K, V](
     // Do nothing. Hadoop RDD should not be checkpointed.
   }
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       var partitionID = "0"
@@ -246,7 +239,7 @@ class NewCarbonDataLoadRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize(addedProperies)
+        loader.initialize()
         new DataLoadExecutor().execute(model,
           loader.storeLocation,
           recordReaders)
@@ -391,17 +384,16 @@ class NewCarbonDataLoadRDD[K, V](
  *  @see org.apache.carbondata.processing.newflow.DataLoadExecutor
  */
 class NewDataFrameLoaderRDD[K, V](
-                                   sc: SparkContext,
-                                   result: DataLoadResult[K, V],
-                                   carbonLoadModel: CarbonLoadModel,
-                                   loadCount: Integer,
-                                   tableCreationTime: Long,
-                                   schemaLastUpdatedTime: Long,
-                                   prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    loadCount: Integer,
+    tableCreationTime: Long,
+    schemaLastUpdatedTime: Long,
+    prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val partitionID = "0"
@@ -438,7 +430,7 @@ class NewDataFrameLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize(addedProperies)
+        loader.initialize()
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: BadRecordFoundException =>
@@ -593,11 +585,9 @@ class PartitionTableDataLoaderRDD[K, V](
     loadCount: Integer,
     tableCreationTime: Long,
     schemaLastUpdatedTime: Long,
-    prev: RDD[Row]) extends RDD[(K, V)](prev) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+    prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val partitionID = "0"
@@ -625,7 +615,7 @@ class PartitionTableDataLoaderRDD[K, V](
           String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
-        loader.initialize(addedProperies)
+        loader.initialize()
         new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
       } catch {
         case e: BadRecordFoundException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
index 30050f7..1025da7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateCoalescedRDD.scala
@@ -22,25 +22,21 @@ import scala.reflect.ClassTag
 import org.apache.spark._
 import org.apache.spark.rdd.{CoalescedRDDPartition, DataLoadPartitionCoalescer, RDD}
 
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.rdd.CarbonRDD
 
 // This RDD distributes previous RDD data based on number of nodes. i.e., one partition for one node
 
 class UpdateCoalescedRDD[T: ClassTag](
     @transient var prev: RDD[T],
     nodeList: Array[String])
-  extends RDD[T](prev.context, Nil) {
-
-  private val addedProperies = CarbonProperties.getInstance().getAddedProperies
+  extends CarbonRDD[T](prev.context, Nil) {
 
   override def getPartitions: Array[Partition] = {
     new DataLoadPartitionCoalescer(prev, nodeList).run
   }
 
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
       context: TaskContext): Iterator[T] = {
-    // Add the properties added in driver to executor.
-    CarbonProperties.getInstance().setProperties(addedProperies)
     // This iterator combines data from all the parent partitions
     new Iterator[T] {
       val parentPartitionIter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 6b94894..bcfc096 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.spark.rdd
 
-import java.util
-
 import scala.collection.mutable
 
 import org.apache.spark.TaskContext
@@ -54,7 +52,7 @@ object UpdateDataLoad {
         segId,
         loadMetadataDetails)
       // Intialize to set carbon properties
-      loader.initialize(new util.HashMap)
+      loader.initialize()
 
       loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
       new DataLoadExecutor().execute(carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0e6153f..2fc93e6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, Carbon
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
 import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
+import org.apache.carbondata.spark.rdd.CarbonRDD
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
 private[sql] case class CarbonDatasourceHadoopRelation(
@@ -94,7 +95,6 @@ private[sql] case class CarbonDatasourceHadoopRelation(
     requiredColumns.foreach(projection.addColumn)
     CarbonInputFormat.setColumnProjection(conf, projection)
     CarbonInputFormat.setCarbonReadSupport(conf, classOf[SparkRowReadSupportImpl])
-
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
       new SerializableConfiguration(conf),
       absIdentifier,
@@ -120,7 +120,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
   identifier: AbsoluteTableIdentifier,
   inputFormatClass: Class[_ <: CarbonInputFormat[V]],
   valueClass: Class[V])
-  extends RDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
+  extends CarbonRDD[V](sc, Nil) with SparkHadoopMapReduceUtil {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
@@ -128,8 +128,7 @@ class CarbonHadoopFSRDD[V: ClassTag](
   }
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
-  @DeveloperApi
-  override def compute(split: Partition,
+  override def internalCompute(split: Partition,
     context: TaskContext): Iterator[V] = {
     val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
     val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 7bfd742..f0cd33b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
 import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, LogicalPlan}
-import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SetCommand, SparkPlan}
+import org.apache.spark.sql.execution.{ExecutedCommand, Filter, Project, SparkPlan}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => LogicalDescribeCommand, LogicalRelation}
 import org.apache.spark.sql.hive.execution.{DropTable, HiveNativeCommand}
@@ -316,8 +316,6 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
         } else {
           ExecutedCommand(HiveNativeCommand(sql)) :: Nil
         }
-      case set@SetCommand(kv) =>
-        ExecutedCommand(CarbonSetCommand(set)) :: Nil
       case _ =>
         Nil
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index d047b20..0f42940 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -18,12 +18,10 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.execution.{RunnableCommand, SetCommand}
+import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.execution.command.DropTableCommand
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 
-import org.apache.carbondata.core.util.CarbonProperties
-
 private[hive] case class CreateDatabaseCommand(dbName: String,
     command: HiveNativeCommand) extends RunnableCommand {
   def run(sqlContext: SQLContext): Seq[Row] = {
@@ -55,15 +53,3 @@ private[hive] case class DropDatabaseCascadeCommand(dbName: String,
     rows
   }
 }
-
-case class CarbonSetCommand(command: SetCommand)
-  extends RunnableCommand {
-
-  override val output = command.output
-
-  override def run(sparkSession: SQLContext): Seq[Row] = {
-    val rows = command.run(sparkSession)
-    CarbonProperties.getInstance().addProperty(rows.head.getString(0), rows.head.getString(1))
-    rows
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2b77654..48af516 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -128,7 +128,7 @@ object CarbonDataRDDFactory {
       isCompactionTriggerByDDl
     )
 
-    val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
           CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
         )
@@ -275,8 +275,8 @@ object CarbonDataRDDFactory {
               exception = e
           }
           // continue in case of exception also, check for all the tables.
-          val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession).
-            sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
                 CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
               ).equalsIgnoreCase("true")
 
@@ -397,8 +397,8 @@ object CarbonDataRDDFactory {
         }
         storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
-        val isConcurrentCompactionAllowed = CarbonEnv.getInstance(sqlContext.sparkSession)
-          .sessionParams.getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
               CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
             )
             .equalsIgnoreCase("true")
@@ -1042,8 +1042,7 @@ object CarbonDataRDDFactory {
     val timeStampFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val timestampFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
-        .sessionParams.getProperty(CarbonCommonConstants
+      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
       new SimpleDateFormat(timestampFormatString)
     }
@@ -1051,8 +1050,7 @@ object CarbonDataRDDFactory {
     val dateFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
     } else {
-      val dateFormatString = CarbonEnv.getInstance(sqlContext.sparkSession)
-        .sessionParams.getProperty(CarbonCommonConstants
+      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
         .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
       new SimpleDateFormat(dateFormatString)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index b0044d7..7c096d3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.processing.merger.TableMeta
@@ -52,6 +53,8 @@ case class CarbonDatasourceHadoopRelation(
       absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
     .asInstanceOf[CarbonRelation]
 
+  val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
+  ThreadLocalSessionParams.setSessionParams(sessionParams)
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 49cf54f..bd1c8b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+import org.apache.carbondata.spark.rdd.CarbonRDD
 
 /**
  * It decodes the data.
@@ -444,7 +445,7 @@ class CarbonDecoderRDD(
     prev: RDD[InternalRow],
     output: Seq[Attribute],
     sparkSession: SparkSession)
-  extends RDD[InternalRow](prev) {
+  extends CarbonRDD[InternalRow](prev) {
 
   private val storepath = CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath
 
@@ -513,7 +514,7 @@ class CarbonDecoderRDD(
     dictIds
   }
 
-  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+  override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
     val absoluteTableIdentifiers = relations.map { relation =>
       val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
       (carbonTable.getFactTableName, carbonTable.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 0851ec2..78820ea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -21,10 +21,11 @@ import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.hive.{CarbonMetastore, CarbonSessionCatalog}
+import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, SessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, SessionParams, ThreadLocalSessionParams}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -48,11 +49,18 @@ class CarbonEnv {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
       sessionParams = new SessionParams()
+      ThreadLocalSessionParams.setSessionParams(sessionParams)
+      val config = new CarbonSQLConf(sparkSession)
+      if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
+        config.addDefaultCarbonParams()
+      }
+      // add session params after adding DefaultCarbonParams
+      config.addDefaultCarbonSessionParams()
       carbonMetastore = {
         val storePath =
         CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
         LOGGER.info(s"carbon env initial: $storePath")
-        new CarbonMetastore(sparkSession.conf, storePath, sessionParams)
+        new CarbonMetastore(sparkSession.conf, storePath)
       }
       CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
       initialized = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 3079c84..1c16143 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -56,7 +56,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         None)
       case _ =>
         val options = new CarbonOption(parameters)
-        val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
+        val storePath = CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.STORE_LOCATION)
         val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
@@ -77,8 +77,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                           "specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
-      .getProperty(CarbonCommonConstants.STORE_LOCATION)
+    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
     val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
       .exists(tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 8d0b4ea..4605914 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -520,8 +520,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
         System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
       } else {
-        CarbonEnv.getInstance(sqlContext.sparkSession).sessionParams
-          .getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
           CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
index 805a4df..a8985b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala
@@ -24,7 +24,8 @@ import java.util.{Locale, TimeZone}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not}
-import org.apache.spark.sql.{CarbonEnv, CastExpr, SparkSession, sources}
+import org.apache.spark.sql.CastExpr
+import org.apache.spark.sql.sources
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, TimestampType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -34,8 +35,7 @@ object CastExpressionOptimization {
 
 
   def typeCastStringToLong(v: Any): Any = {
-    val parser: SimpleDateFormat = new SimpleDateFormat(
-      CarbonEnv.getInstance(SparkSession.getActiveSession.get).sessionParams
+    val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index 627de02..a4feead 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive.execution.command
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, RunnableCommand, SetCommand}
+import org.apache.spark.sql.execution.command.{CarbonDropTableCommand, DropDatabaseCommand, ResetCommand, RunnableCommand, SetCommand}
 
 import org.apache.carbondata.core.util.CarbonProperties
 
@@ -49,10 +49,26 @@ case class CarbonSetCommand(command: SetCommand)
   override val output = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val rows = command.run(sparkSession)
-    CarbonEnv.getInstance(sparkSession).sessionParams
-      .addProperty(rows.head.getString(0), rows.head.getString(1))
-    rows
+    val sessionParms = CarbonEnv.getInstance(sparkSession).sessionParams
+    command.kv match {
+      case Some((key, Some(value))) =>
+        val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
+        if (isCarbonProperty) {
+          sessionParms.addProperty(key, value)
+        }
+      case _ =>
+
+    }
+    command.run(sparkSession)
   }
 }
 
+case class CarbonResetCommand()
+  extends RunnableCommand {
+  override val output = ResetCommand.output
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    CarbonEnv.getInstance(sparkSession).sessionParams.clear()
+    ResetCommand.run(sparkSession)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
index 35be543..7d0215f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/DDLStrategy.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
-import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonSetCommand}
+import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -117,6 +117,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         ExecutedCommandExec(DescribeCommandFormatted(resultPlan, plan.output, identifier)) :: Nil
       case set@SetCommand(kv) =>
         ExecutedCommandExec(CarbonSetCommand(set)) :: Nil
+      case reset@ResetCommand =>
+        ExecutedCommandExec(CarbonResetCommand()) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f1fd05b..0064c21 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -107,7 +107,7 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
-    var storeLocation = CarbonEnv.getInstance(sparkSession).sessionParams
+    var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
         System.getProperty("java.io.tmpdir")
       )
@@ -359,8 +359,7 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonEnv.getInstance(sparkSession).sessionParams
-      .addProperty("zookeeper.enable.lock", "false")
+    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -409,7 +408,7 @@ case class LoadTable(
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
       val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonEnv.getInstance(sparkSession).sessionParams
+      val badRecordActionValue = CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
       val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
@@ -429,12 +428,11 @@ case class LoadTable(
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonEnv.getInstance(sparkSession)
-        .sessionParams.getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonEnv.getInstance(sparkSession).sessionParams.
-        getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
         .setSerializationNullFormat(
@@ -536,7 +534,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonEnv.getInstance(sparkSession).sessionParams
+          val dictionaryServerPort = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index 54cffc2..04a94ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.spark.sql.{CarbonEnv, RuntimeConfig, SparkSession}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.reader.ThriftReader
 import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil, SessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
@@ -104,7 +104,7 @@ case class DictionaryMap(dictionaryMap: Map[String, Boolean]) {
   }
 }
 
-class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams: SessionParams) {
+class CarbonMetastore(conf: RuntimeConfig, val storePath: String) {
 
   @transient
   val LOGGER = LogServiceFactory.getLogService("org.apache.spark.sql.CarbonMetastoreCatalog")
@@ -201,15 +201,18 @@ class CarbonMetastore(conf: RuntimeConfig, val storePath: String, sessionParams:
     // if zookeeper is configured as carbon lock type.
     val zookeeperurl = conf.get(CarbonCommonConstants.ZOOKEEPER_URL, null)
     if (null != zookeeperurl) {
-      sessionParams.addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.ZOOKEEPER_URL, zookeeperurl)
     }
     if (metadataPath == null) {
       return null
     }
     // if no locktype is configured and store type is HDFS set HDFS lock as default
-    if (null == sessionParams.getProperty(CarbonCommonConstants.LOCK_TYPE) &&
+    if (null == CarbonProperties.getInstance
+      .getProperty(CarbonCommonConstants.LOCK_TYPE) &&
         FileType.HDFS == FileFactory.getFileType(metadataPath)) {
-      sessionParams.addProperty(CarbonCommonConstants.LOCK_TYPE,
+      CarbonProperties.getInstance
+        .addProperty(CarbonCommonConstants.LOCK_TYPE,
           CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS
         )
       LOGGER.info("Default lock type HDFSLOCK is configured")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 156a12e..4aef118 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -115,7 +115,7 @@ class CarbonSessionCatalog(
  */
 class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) {
 
-  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf)
+  override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession)
 
   experimentalMethods.extraStrategies =
     Seq(new CarbonLateDecodeStrategy, new DDLStrategy(sparkSession))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/95ce1da1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 258920b..3412fb0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,30 +18,33 @@ package org.apache.spark.sql.parser
 
 import scala.collection.mutable
 
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
-import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext,
-TablePropertyListContext}
+import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{CreateTableContext, TablePropertyListContext}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkSqlAstBuilder
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field,
-PartitionerField, TableModel}
+import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
+import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
- * Concrete parser for Spark SQL statements and carbon specific statements
+ * Concrete parser for Spark SQL stateENABLE_INMEMORY_MERGE_SORT_DEFAULTments and carbon specific
+ * statements
  */
-class CarbonSparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
+class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends AbstractSqlParser {
 
   val astBuilder = new CarbonSqlAstBuilder(conf)
 
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
+    val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
+    ThreadLocalSessionParams.setSessionParams(sessionParams)
     try {
       super.parsePlan(sqlText)
     } catch {


[3/5] carbondata git commit: 1. Refactored the bad record code, by default the bad record path will be empty, if bad record logger is enabled or action is redirect and bad record path is not configured then data-load will fail. 2. Support dynamic set

Posted by gv...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0064c21..f9f556d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -35,9 +35,10 @@ import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -359,7 +360,8 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -407,31 +409,60 @@ case class LoadTable(
       val commentChar = options.getOrElse("commentchar", "#")
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
-      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonProperties.getInstance()
+      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable",
+        carbonProperty
+          .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+            CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))
+      val badRecordActionValue = carbonProperty
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-      val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
-      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
+      val badRecordsAction = options.getOrElse("bad_records_action", carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+          badRecordActionValue))
+      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))
       val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
-      val dateFormat = options.getOrElse("dateformat", null)
+      val dateFormat = options.getOrElse("dateformat",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))
       ValidateUtil.validateDateFormat(dateFormat, table, tableName)
       val maxColumns = options.getOrElse("maxcolumns", null)
-      val sortScope = options.getOrElse("sort_scope", null)
+      val sortScope = options
+        .getOrElse("sort_scope",
+          carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+            carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+              CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))
       ValidateUtil.validateSortScope(table, sortScope)
-      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
-      val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
+      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+          carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
+      val bad_record_path = options.getOrElse("bad_record_path",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+          carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))
+      if (badRecordsLoggerEnable.toBoolean ||
+          LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
+        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+          sys.error("Invalid bad records location.")
+        }
+      }
+      carbonLoadModel.setBadRecordsLocation(bad_record_path)
+      val globalSortPartitions = options.getOrElse("global_sort_partitions",
+        carbonProperty
+          .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
@@ -449,7 +480,9 @@ case class LoadTable(
       carbonLoadModel.setSortScope(sortScope)
       carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
       carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
-      val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
+      val useOnePass = options.getOrElse("single_pass",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
         case "true" =>
           true
         case "false" =>
@@ -534,7 +567,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonProperties.getInstance()
+          val dictionaryServerPort = carbonProperty
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.
@@ -776,13 +809,6 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
         }
-        // delete bad record log after drop table
-        val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
-        val badLogFileType = FileFactory.getFileType(badLogPath)
-        if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
-          val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
-          CarbonUtil.deleteFoldersAndFiles(file)
-        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
new file mode 100644
index 0000000..51b29a1
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+      SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To enable/ disable carbon custom block distribution.")
+        .booleanConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    val BAD_RECORDS_LOGGER_ENABLE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 3412fb0..41d6bd3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkSqlAstBuilder
 import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
-import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -43,8 +43,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
-    val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
-    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    val carbonSessionInfo: CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     try {
       super.parsePlan(sqlText)
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
new file mode 100644
index 0000000..846c4b6
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordPathLoadOptionTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+  var badRecordPath: String = null
+  override def beforeAll {
+    try {
+       badRecordPath = new File("./target/test/badRecords")
+        .getCanonicalPath.replaceAll("\\\\","/")
+      sql("drop table IF EXISTS salestest")
+    }
+  }
+
+  test("data load log file and csv file written at the configured location") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+    sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH}=${badRecordPath}")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE salestest OPTIONS" +
+        "('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    val location: Boolean = isFilesWrittenAtBadStoreLocation
+    assert(location)
+  }
+
+  override def afterAll {
+    sql("drop table salestest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+  def isFilesWrittenAtBadStoreLocation: Boolean = {
+    val badStorePath = badRecordPath + "/default/salestest/0/0"
+    val carbonFile: CarbonFile = FileFactory
+      .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath))
+    var exists: Boolean = carbonFile.exists()
+    if (exists) {
+      val listFiles: Array[CarbonFile] = carbonFile.listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          if (file.getName.endsWith(".log") || file.getName.endsWith(".csv")) {
+            return true;
+          }
+          return false;
+        }
+      })
+      exists = listFiles.size > 0
+    }
+    return exists;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 5e91574..6f57cd6 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -238,7 +238,6 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
     }
   }
-
   //
   override def afterAll {
     sql("drop table IF EXISTS data_pm")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
new file mode 100644
index 0000000..18b4039
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.carbondata.commands
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+
+class SetCommandTestCase extends QueryTest with BeforeAndAfterAll{
+  override def beforeAll: Unit = {
+    sql("set carbon=true")
+  }
+  test("test set command") {
+    checkAnswer(sql("set"), sql("set"))
+  }
+
+  test("test set any value command") {
+    checkAnswer(sql("set carbon=false"), sql("set carbon"))
+  }
+
+  test("test set command for enable.unsafe.sort=true") {
+    checkAnswer(sql("set enable.unsafe.sort=true"), sql("set enable.unsafe.sort"))
+  }
+
+  test("test set command for enable.unsafe.sort for invalid option") {
+    try {
+      checkAnswer(sql("set enable.unsafe.sort=123"), sql("set enable.unsafe.sort"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  //is_empty_data_bad_record
+  test(s"test set command for" +
+       s" ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }=true") {
+    checkAnswer(sql(s"set ${
+      CarbonLoadOptionConstants
+        .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE
+    }=true"), sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }"))
+  }
+
+  test(s"test set command for ${
+    CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${
+        CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE
+      }=123"), sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  test(s"test set command for ${
+    CarbonLoadOptionConstants
+      .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD
+  }=true") {
+    checkAnswer(sql(s"set ${
+      CarbonLoadOptionConstants
+        .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD
+    }=true"),
+      sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD }"))
+  }
+
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD} " +
+       s"for invalid option") {
+    try {
+      checkAnswer(
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  //carbon.custom.block.distribution
+  test("test set command for carbon.custom.block.distribution=true") {
+    checkAnswer(sql("set carbon.custom.block.distribution=true"),
+      sql("set carbon.custom.block.distribution"))
+  }
+
+  test("test set command for carbon.custom.block.distribution for invalid option") {
+    try {
+      checkAnswer(sql("set carbon.custom.block.distribution=123"),
+        sql("set carbon.custom.block.distribution"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // sort_scope
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT") {
+    checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}"))
+  }
+
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // batch_sort_size_inmb
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4") {
+    checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}"))
+  }
+
+  test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=hjf"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // single_pass
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true") {
+    checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
+  }
+
+  test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  override def afterAll {
+    sql("reset")
+    sql("set carbon=true")
+    checkAnswer(sql("set carbon"),
+      sql("set"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
deleted file mode 100644
index 901df3b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-/**
- * enum to hold the bad record logger action
- */
-public enum LoggerAction {
-
-  FORCE("FORCE"), // data will be converted to null
-  REDIRECT("REDIRECT"), // no null conversion moved to bad record and written to raw csv
-  IGNORE("IGNORE"), // no null conversion moved to bad record and not written to raw csv
-  FAIL("FAIL");  //data loading will fail if a bad record is found
-  private String name;
-
-  LoggerAction(String name) {
-    this.name = name;
-  }
-
-  @Override public String toString() {
-    return this.name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 7ec7933..bfc1be9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -158,6 +158,10 @@ public class CarbonLoadModel implements Serializable {
    * Batch sort size in mb.
    */
   private String batchSortSizeInMb;
+  /**
+   * bad record location
+   */
+  private String badRecordsLocation;
 
   /**
    * Number of partitions in global sort.
@@ -363,6 +367,7 @@ public class CarbonLoadModel implements Serializable {
     copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copy.sortScope = sortScope;
     copy.batchSortSizeInMb = batchSortSizeInMb;
+    copy.badRecordsLocation = badRecordsLocation;
     return copy;
   }
 
@@ -464,6 +469,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copyObj.sortScope = sortScope;
     copyObj.batchSortSizeInMb = batchSortSizeInMb;
+    copyObj.badRecordsLocation = badRecordsLocation;
     return copyObj;
   }
 
@@ -764,4 +770,12 @@ public class CarbonLoadModel implements Serializable {
   public void setGlobalSortPartitions(String globalSortPartitions) {
     this.globalSortPartitions = globalSortPartitions;
   }
+
+  public String getBadRecordsLocation() {
+    return badRecordsLocation;
+  }
+
+  public void setBadRecordsLocation(String badRecordsLocation) {
+    this.badRecordsLocation = badRecordsLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 3294d5f..5662a04 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -180,6 +181,8 @@ public final class DataLoadProcessBuilder {
         loadModel.getBatchSortSizeInMb());
     configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
         loadModel.getGlobalSortPartitions());
+    configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+        loadModel.getBadRecordsLocation());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
index 1cc043f..2bf8e16 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.newflow.sort;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Sort scope options
@@ -43,21 +44,7 @@ public class SortScopeOptions {
   }
 
   public static boolean isValidSortOption(String sortScope) {
-    if (sortScope == null) {
-      return false;
-    }
-    switch (sortScope.toUpperCase()) {
-      case "BATCH_SORT":
-        return true;
-      case "LOCAL_SORT":
-        return true;
-      case "GLOBAL_SORT":
-        return true;
-      case "NO_SORT":
-        return true;
-      default:
-        return false;
-    }
+    return CarbonUtil.isValidSortOption(sortScope);
   }
 
   public enum SortScope {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 000d0b9..62d6c94 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -152,16 +153,22 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
-        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
-        identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
-            + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+        identifier.getTableName() + '_' + System.currentTimeMillis(),
+        getBadLogStoreLocation(configuration,
+            identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
+                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
         badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
   }
 
-  public static String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
+      String storeLocation) {
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -198,7 +205,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     // rename the bad record in progress to normal
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
+    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
         identifier.getDatabaseName() + File.separator + identifier.getTableName()
             + File.separator + configuration.getSegmentId() + File.separator + configuration
             .getTaskNo());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index d6185ba..c6f83ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -24,13 +24,14 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -41,6 +42,7 @@ import org.apache.carbondata.processing.newflow.partition.Partitioner;
 import org.apache.carbondata.processing.newflow.partition.impl.HashPartitionerImpl;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * Replace row data fields with dictionary values if column is configured dictionary encoded.
@@ -187,8 +189,12 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   }
 
   private String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -200,6 +206,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
       super.close();
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
+        renameBadRecord(configuration);
       }
       if (converters != null) {
         for (RowConverter converter : converters) {
@@ -208,7 +215,15 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
       }
     }
   }
-
+  private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
+    // rename the bad record in progress to normal
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+        identifier.getDatabaseName() + File.separator + identifier.getTableName()
+            + File.separator + configuration.getSegmentId() + File.separator + configuration
+            .getTaskNo());
+  }
   @Override protected String getStepName() {
     return "Data Converter with Bucketing";
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 84e1f20..62f13db 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -90,12 +91,18 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * @param configuration
    * @param storeLocation
    */
-  public static void renameBadRecordsFromInProgressToNormal(String storeLocation) {
+  public static void renameBadRecordsFromInProgressToNormal(
+      CarbonDataLoadConfiguration configuration, String storeLocation) {
     // get the base store location
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     FileType fileType = FileFactory.getFileType(badLogStoreLocation);
@@ -466,7 +473,8 @@ public final class CarbonDataProcessorUtil {
       if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
           == null) {
         batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+                CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT));
       } else {
         batchSortSizeInMb = Integer.parseInt(
             configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index d5a4f02..fdbd2f8 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -59,8 +59,9 @@ public class BlockIndexStoreTest extends TestCase {
 
   @BeforeClass public void setUp() {
 	property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
-	
 	CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10");


[4/5] carbondata git commit: 1. Refactored the bad record code, by default the bad record path will be empty, if bad record logger is enabled or action is redirect and bad record path is not configured then data-load will fail. 2. Support dynamic set

Posted by gv...@apache.org.
1. Refactored the bad record code, by default the bad record path will be empty, if bad record logger is    enabled or action is redirect and bad record path is not configured then data-load will fail. 2. Support dynamic set command for some of load options 3. fixed test cases 4. Added validation for the supported property in the dynamic set command 5. Change table delete behavior // now the bad record would not be deleted ion table drop 6. added test case for bad record path in load option 7. fixed failing test cases 8. Added "carbon.options." in load options parameters


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/39644b5e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/39644b5e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/39644b5e

Branch: refs/heads/master
Commit: 39644b5e003bddf89c80ad539506b4a29b04b526
Parents: 95ce1da
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Mon Jun 12 18:33:22 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jun 27 16:10:28 2017 +0530

----------------------------------------------------------------------
 .../common/constants/LoggerAction.java          |  38 +++++
 .../core/constants/CarbonCommonConstants.java   | 100 ++++++++++-
 .../constants/CarbonLoadOptionConstants.java    |  88 ++++++++++
 .../constants/CarbonV3DataFormatConstants.java  |   5 +
 .../InvalidConfigurationException.java          |  87 ++++++++++
 .../carbondata/core/util/CarbonProperties.java  |  97 +++++++----
 .../carbondata/core/util/CarbonProperty.java    |  28 ++++
 .../carbondata/core/util/CarbonSessionInfo.java |  38 +++++
 .../apache/carbondata/core/util/CarbonUtil.java |  74 ++++++++-
 .../carbondata/core/util/SessionParams.java     | 127 ++++++++++----
 .../core/util/ThreadLocalSessionInfo.java       |  34 ++++
 .../core/util/ThreadLocalSessionParams.java     |  34 ----
 .../hadoop/ft/CarbonInputMapperTest.java        |   5 +
 .../carbondata/hadoop/ft/InputFilesTest.java    |   5 +
 .../testsuite/commands/SetCommandTestCase.scala |  34 ----
 .../dataload/TestGlobalSortDataLoad.scala       |   6 +-
 .../TestLoadDataWithDiffTimestampFormat.scala   |   4 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   8 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   6 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   4 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../execution/command/carbonTableSchema.scala   |  19 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   6 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   8 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../execution/command/carbonTableSchema.scala   |  68 +++++---
 .../spark/sql/internal/CarbonSqlConf.scala      | 144 ++++++++++++++++
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   6 +-
 .../BadRecordPathLoadOptionTest.scala           |  87 ++++++++++
 .../DataLoadFailAllTypeSortTest.scala           |   1 -
 .../commands/SetCommandTestCase.scala           | 165 +++++++++++++++++++
 .../processing/constants/LoggerAction.java      |  38 -----
 .../processing/model/CarbonLoadModel.java       |  14 ++
 .../newflow/DataLoadProcessBuilder.java         |   3 +
 .../newflow/sort/SortScopeOptions.java          |  17 +-
 .../steps/DataConverterProcessorStepImpl.java   |  25 ++-
 ...ConverterProcessorWithBucketingStepImpl.java |  23 ++-
 .../util/CarbonDataProcessorUtil.java           |  16 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |   3 +-
 41 files changed, 1205 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java b/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java
new file mode 100644
index 0000000..9c027fe
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.constants;
+
+/**
+ * enum to hold the bad record logger action
+ */
+public enum LoggerAction {
+
+  FORCE("FORCE"), // data will be converted to null
+  REDIRECT("REDIRECT"), // no null conversion moved to bad record and written to raw csv
+  IGNORE("IGNORE"), // no null conversion moved to bad record and not written to raw csv
+  FAIL("FAIL");  //data loading will fail if a bad record is found
+  private String name;
+
+  LoggerAction(String name) {
+    this.name = name;
+  }
+
+  @Override public String toString() {
+    return this.name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a9b2eb7..208bab8 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.constants;
 
 import java.nio.charset.Charset;
 
+import org.apache.carbondata.core.util.CarbonProperty;
+
 public final class CarbonCommonConstants {
   /**
    * integer size in bytes
@@ -51,18 +53,22 @@ public final class CarbonCommonConstants {
   /**
    * location of the carbon member, hierarchy and fact files
    */
+  @CarbonProperty
   public static final String STORE_LOCATION = "carbon.storelocation";
   /**
    * blocklet size in carbon file
    */
+  @CarbonProperty
   public static final String BLOCKLET_SIZE = "carbon.blocklet.size";
   /**
    * Number of cores to be used
    */
+  @CarbonProperty
   public static final String NUM_CORES = "carbon.number.of.cores";
   /**
    * carbon sort size
    */
+  @CarbonProperty
   public static final String SORT_SIZE = "carbon.sort.size";
   /**
    * default location of the carbon member, hierarchy and fact files
@@ -123,6 +129,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_DDL_BASE_HDFS_URL
    */
+  @CarbonProperty
   public static final String CARBON_DDL_BASE_HDFS_URL = "carbon.ddl.base.hdfs.url";
   /**
    * Load Folder Name
@@ -139,6 +146,7 @@ public final class CarbonCommonConstants {
   /**
    * FS_DEFAULT_FS
    */
+  @CarbonProperty
   public static final String FS_DEFAULT_FS = "fs.defaultFS";
   /**
    * BYTEBUFFER_SIZE
@@ -182,11 +190,12 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_BADRECORDS_LOCATION
    */
+  @CarbonProperty
   public static final String CARBON_BADRECORDS_LOC = "carbon.badRecords.location";
   /**
    * CARBON_BADRECORDS_LOCATION_DEFAULT
    */
-  public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL = "/tmp/carbon/badRecords";
+  public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL = "";
   /**
    * HIERARCHY_FILE_EXTENSION
    */
@@ -220,6 +229,7 @@ public final class CarbonCommonConstants {
   /**
    * GRAPH_ROWSET_SIZE
    */
+  @CarbonProperty
   public static final String GRAPH_ROWSET_SIZE = "carbon.graph.rowset.size";
   /**
    * GRAPH_ROWSET_SIZE_DEFAULT
@@ -244,6 +254,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_INTERMEDIATE_FILES_LIMIT
    */
+  @CarbonProperty
   public static final String SORT_INTERMEDIATE_FILES_LIMIT = "carbon.sort.intermediate.files.limit";
   /**
    * SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE
@@ -260,10 +271,12 @@ public final class CarbonCommonConstants {
   /**
    * SORT_FILE_BUFFER_SIZE
    */
+  @CarbonProperty
   public static final String SORT_FILE_BUFFER_SIZE = "carbon.sort.file.buffer.size";
   /**
    * no.of records after which counter to be printed
    */
+  @CarbonProperty
   public static final String DATA_LOAD_LOG_COUNTER = "carbon.load.log.counter";
   /**
    * DATA_LOAD_LOG_COUNTER_DEFAULT_COUNTER
@@ -272,6 +285,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_FILE_WRITE_BUFFER_SIZE
    */
+  @CarbonProperty
   public static final String CARBON_SORT_FILE_WRITE_BUFFER_SIZE =
       "carbon.sort.file.write.buffer.size";
   /**
@@ -281,14 +295,17 @@ public final class CarbonCommonConstants {
   /**
    * Number of cores to be used while loading
    */
+  @CarbonProperty
   public static final String NUM_CORES_LOADING = "carbon.number.of.cores.while.loading";
   /**
    * Number of cores to be used while compacting
    */
+  @CarbonProperty
   public static final String NUM_CORES_COMPACTING = "carbon.number.of.cores.while.compacting";
   /**
    * Number of cores to be used for block sort
    */
+  @CarbonProperty
   public static final String NUM_CORES_BLOCK_SORT = "carbon.number.of.cores.block.sort";
   /**
    * Default value of number of cores to be used for block sort
@@ -305,6 +322,7 @@ public final class CarbonCommonConstants {
   /**
    * CSV_READ_BUFFER_SIZE
    */
+  @CarbonProperty
   public static final String CSV_READ_BUFFER_SIZE = "carbon.csv.read.buffersize.byte";
   /**
    * CSV_READ_BUFFER_SIZE
@@ -355,6 +373,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_MERGE_SORT_READER_THREAD
    */
+  @CarbonProperty
   public static final String CARBON_MERGE_SORT_READER_THREAD = "carbon.merge.sort.reader.thread";
   /**
    * CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE
@@ -363,6 +382,7 @@ public final class CarbonCommonConstants {
   /**
    * IS_SORT_TEMP_FILE_COMPRESSION_ENABLED
    */
+  @CarbonProperty
   public static final String IS_SORT_TEMP_FILE_COMPRESSION_ENABLED =
       "carbon.is.sort.temp.file.compression.enabled";
   /**
@@ -372,6 +392,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
    */
+  @CarbonProperty
   public static final String SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION =
       "carbon.sort.temp.file.no.of.records.for.compression";
   /**
@@ -390,6 +411,7 @@ public final class CarbonCommonConstants {
    * Property for specifying the format of TIMESTAMP data type column.
    * e.g. yyyy/MM/dd HH:mm:ss, or using CARBON_TIMESTAMP_DEFAULT_FORMAT
    */
+  @CarbonProperty
   public static final String CARBON_TIMESTAMP_FORMAT = "carbon.timestamp.format";
 
   /**
@@ -400,14 +422,17 @@ public final class CarbonCommonConstants {
    * Property for specifying the format of DATE data type column.
    * e.g. yyyy/MM/dd , or using CARBON_DATE_DEFAULT_FORMAT
    */
+  @CarbonProperty
   public static final String CARBON_DATE_FORMAT = "carbon.date.format";
   /**
    * STORE_LOCATION_HDFS
    */
+  @CarbonProperty
   public static final String STORE_LOCATION_HDFS = "carbon.storelocation.hdfs";
   /**
    * STORE_LOCATION_TEMP_PATH
    */
+  @CarbonProperty
   public static final String STORE_LOCATION_TEMP_PATH = "carbon.tempstore.location";
   /**
    * IS_COLUMNAR_STORAGE_DEFAULTVALUE
@@ -424,6 +449,7 @@ public final class CarbonCommonConstants {
   /**
    * IS_INT_BASED_INDEXER
    */
+  @CarbonProperty
   public static final String AGGREAGATE_COLUMNAR_KEY_BLOCK = "aggregate.columnar.keyblock";
   /**
    * IS_INT_BASED_INDEXER_DEFAULTVALUE
@@ -432,6 +458,7 @@ public final class CarbonCommonConstants {
   /**
    * ENABLE_QUERY_STATISTICS
    */
+  @CarbonProperty
   public static final String ENABLE_QUERY_STATISTICS = "enable.query.statistics";
   /**
    * ENABLE_QUERY_STATISTICS_DEFAULT
@@ -440,6 +467,7 @@ public final class CarbonCommonConstants {
   /**
    * TIME_STAT_UTIL_TYPE
    */
+  @CarbonProperty
   public static final String ENABLE_DATA_LOADING_STATISTICS = "enable.data.loading.statistics";
   /**
    * TIME_STAT_UTIL_TYPE_DEFAULT
@@ -448,6 +476,7 @@ public final class CarbonCommonConstants {
   /**
    * IS_INT_BASED_INDEXER
    */
+  @CarbonProperty
   public static final String HIGH_CARDINALITY_VALUE = "high.cardinality.value";
   /**
    * IS_INT_BASED_INDEXER_DEFAULTVALUE
@@ -508,6 +537,7 @@ public final class CarbonCommonConstants {
   /**
    * MAX_QUERY_EXECUTION_TIME
    */
+  @CarbonProperty
   public static final String MAX_QUERY_EXECUTION_TIME = "max.query.execution.time";
   /**
    * CARBON_TIMESTAMP
@@ -529,17 +559,20 @@ public final class CarbonCommonConstants {
   /**
    * NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
    */
+  @CarbonProperty
   public static final String NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK =
       "carbon.load.metadata.lock.retries";
   /**
    * MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK
    */
+  @CarbonProperty
   public static final String MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK =
       "carbon.load.metadata.lock.retry.timeout.sec";
 
   /**
    * compressor for writing/reading carbondata file
    */
+  @CarbonProperty
   public static final String COMPRESSOR = "carbon.column.compressor";
 
   /**
@@ -596,6 +629,7 @@ public final class CarbonCommonConstants {
   /**
    * The batch size of records which returns to client.
    */
+  @CarbonProperty
   public static final String DETAIL_QUERY_BATCH_SIZE = "carbon.detail.batch.size";
 
   public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;
@@ -609,6 +643,7 @@ public final class CarbonCommonConstants {
   /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
+  @CarbonProperty
   public static final String CARBON_MAX_DRIVER_LRU_CACHE_SIZE = "carbon.max.driver.lru.cache.size";
   public static final String POSITION_REFERENCE = "positionReference";
   /**
@@ -618,10 +653,12 @@ public final class CarbonCommonConstants {
   /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
+  @CarbonProperty
   public static final String CARBON_MAX_LEVEL_CACHE_SIZE = "carbon.max.level.cache.size";
   /**
    * max executor lru cache size upto which lru cache will be loaded in memory
    */
+  @CarbonProperty
   public static final String CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE =
       "carbon.max.executor.lru.cache.size";
   /**
@@ -649,6 +686,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_PREFETCH_BUFFERSIZE
    */
+  @CarbonProperty
   public static final String CARBON_PREFETCH_BUFFERSIZE = "carbon.prefetch.buffersize";
   /**
    * CARBON_PREFETCH_BUFFERSIZE DEFAULT VALUE
@@ -665,6 +703,7 @@ public final class CarbonCommonConstants {
   /**
    * ENABLE_AUTO_LOAD_MERGE
    */
+  @CarbonProperty
   public static final String ENABLE_AUTO_LOAD_MERGE = "carbon.enable.auto.load.merge";
   /**
    * DEFAULT_ENABLE_AUTO_LOAD_MERGE
@@ -675,6 +714,7 @@ public final class CarbonCommonConstants {
    * ZOOKEEPER_ENABLE_LOCK if this is set to true then zookeeper will be used to handle locking
    * mechanism of carbon
    */
+  @CarbonProperty
   public static final String LOCK_TYPE = "carbon.lock.type";
 
   /**
@@ -691,11 +731,13 @@ public final class CarbonCommonConstants {
   /**
    * maximum dictionary chunk size that can be kept in memory while writing dictionary file
    */
+  @CarbonProperty
   public static final String DICTIONARY_ONE_CHUNK_SIZE = "carbon.dictionary.chunk.size";
 
   /**
    *  Dictionary Server Worker Threads
    */
+  @CarbonProperty
   public static final String DICTIONARY_WORKER_THREADS = "dictionary.worker.threads";
 
   /**
@@ -711,6 +753,7 @@ public final class CarbonCommonConstants {
   /**
    * xxhash algorithm property for hashmap
    */
+  @CarbonProperty
   public static final String ENABLE_XXHASH = "carbon.enableXXHash";
 
   /**
@@ -744,6 +787,7 @@ public final class CarbonCommonConstants {
   /**
    * Size of Major Compaction in MBs
    */
+  @CarbonProperty
   public static final String MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
 
   /**
@@ -754,6 +798,7 @@ public final class CarbonCommonConstants {
   /**
    * This property is used to tell how many segments to be preserved from merging.
    */
+  @CarbonProperty
   public static final java.lang.String PRESERVE_LATEST_SEGMENTS_NUMBER =
       "carbon.numberof.preserve.segments";
 
@@ -765,6 +810,7 @@ public final class CarbonCommonConstants {
   /**
    * This property will determine the loads of how many days can be compacted.
    */
+  @CarbonProperty
   public static final java.lang.String DAYS_ALLOWED_TO_COMPACT = "carbon.allowed.compaction.days";
 
   /**
@@ -775,6 +821,7 @@ public final class CarbonCommonConstants {
   /**
    * space reserved for writing block meta data in carbon data file
    */
+  @CarbonProperty
   public static final String CARBON_BLOCK_META_RESERVED_SPACE =
       "carbon.block.meta.size.reserved.percentage";
 
@@ -786,6 +833,7 @@ public final class CarbonCommonConstants {
   /**
    * property to enable min max during filter query
    */
+  @CarbonProperty
   public static final String CARBON_QUERY_MIN_MAX_ENABLED = "carbon.enableMinMax";
 
   /**
@@ -797,6 +845,7 @@ public final class CarbonCommonConstants {
    * this variable is to enable/disable prefetch of data during merge sort while
    * reading data from sort temp files
    */
+  @CarbonProperty
   public static final String CARBON_MERGE_SORT_PREFETCH = "carbon.merge.sort.prefetch";
   public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
 
@@ -823,17 +872,27 @@ public final class CarbonCommonConstants {
   /**
    * this variable is to enable/disable identify high cardinality during first data loading
    */
+  @CarbonProperty
   public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE = "high.cardinality.identify.enable";
   public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT = "true";
 
   /**
    * threshold of high cardinality
    */
+  @CarbonProperty
   public static final String HIGH_CARDINALITY_THRESHOLD = "high.cardinality.threshold";
   public static final String HIGH_CARDINALITY_THRESHOLD_DEFAULT = "1000000";
   public static final int HIGH_CARDINALITY_THRESHOLD_MIN = 10000;
 
   /**
+   * percentage of cardinality in row count
+   */
+  @CarbonProperty
+  public static final String HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE =
+      "high.cardinality.row.count.percentage";
+  public static final String HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT = "80";
+
+  /**
    * 16 mb size
    */
   public static final long CARBON_16MB = 16 * 1024 * 1024;
@@ -871,6 +930,7 @@ public final class CarbonCommonConstants {
   /**
    * Number of unmerged segments to be merged.
    */
+  @CarbonProperty
   public static final String COMPACTION_SEGMENT_LEVEL_THRESHOLD =
       "carbon.compaction.level.threshold";
 
@@ -883,6 +943,7 @@ public final class CarbonCommonConstants {
    * Number of Update Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
+  @CarbonProperty
   public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
       "carbon.horizontal.update.compaction.threshold";
   /**
@@ -894,6 +955,7 @@ public final class CarbonCommonConstants {
    * Number of Delete Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
+  @CarbonProperty
   public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
       "carbon.horizontal.delete.compaction.threshold";
   /**
@@ -909,6 +971,7 @@ public final class CarbonCommonConstants {
   /**
    * hive connection url
    */
+  @CarbonProperty
   public static final String HIVE_CONNECTION_URL = "javax.jdo.option.ConnectionURL";
 
   /**
@@ -924,11 +987,13 @@ public final class CarbonCommonConstants {
   /**
    * hdfs temporary directory key
    */
+  @CarbonProperty
   public static final String HDFS_TEMP_LOCATION = "hadoop.tmp.dir";
 
   /**
    * zookeeper url key
    */
+  @CarbonProperty
   public static final String ZOOKEEPER_URL = "spark.deploy.zookeeper.url";
 
   /**
@@ -945,6 +1010,7 @@ public final class CarbonCommonConstants {
    * @Deprecated : This property has been deprecated.
    * Property for enabling system level compaction lock.1 compaction can run at once.
    */
+  @CarbonProperty
   public static String ENABLE_CONCURRENT_COMPACTION = "carbon.concurrent.compaction";
 
   /**
@@ -970,6 +1036,7 @@ public final class CarbonCommonConstants {
   /**
    * carbon data file version property
    */
+  @CarbonProperty
   public static final String CARBON_DATA_FILE_VERSION = "carbon.data.file.version";
 
   /**
@@ -1005,11 +1072,13 @@ public final class CarbonCommonConstants {
   /**
    * to determine to use the rdd persist or not.
    */
+  @CarbonProperty
   public static String isPersistEnabled = "carbon.update.persist.enable";
 
   /**
    * for enabling or disabling Horizontal Compaction.
    */
+  @CarbonProperty
   public static String isHorizontalCompactionEnabled = "carbon.horizontal.compaction.enable";
 
   /**
@@ -1039,6 +1108,7 @@ public final class CarbonCommonConstants {
   /**
    * Maximum waiting time (in seconds) for a query for requested executors to be started
    */
+  @CarbonProperty
   public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT =
       "carbon.max.executor.startup.timeout";
 
@@ -1072,6 +1142,7 @@ public final class CarbonCommonConstants {
   /**
    * to enable offheap sort
    */
+  @CarbonProperty
   public static final String ENABLE_UNSAFE_SORT = "enable.unsafe.sort";
 
   /**
@@ -1082,21 +1153,22 @@ public final class CarbonCommonConstants {
   /**
    * to enable offheap sort
    */
+  @CarbonProperty
   public static final String ENABLE_OFFHEAP_SORT = "enable.offheap.sort";
 
   /**
    * to enable offheap sort
    */
   public static final String ENABLE_OFFHEAP_SORT_DEFAULT = "true";
-
+  @CarbonProperty
   public static final String ENABLE_INMEMORY_MERGE_SORT = "enable.inmemory.merge.sort";
 
   public static final String ENABLE_INMEMORY_MERGE_SORT_DEFAULT = "false";
-
+  @CarbonProperty
   public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB = "offheap.sort.chunk.size.inmb";
 
   public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT = "64";
-
+  @CarbonProperty
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB = "sort.inmemory.size.inmb";
 
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
@@ -1104,7 +1176,10 @@ public final class CarbonCommonConstants {
   /**
    * Sorts the data in batches and writes the batch data to store with index file.
    */
+  @CarbonProperty
   public static final String LOAD_SORT_SCOPE = "carbon.load.sort.scope";
+  @CarbonProperty
+  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
 
   /**
    * If set to BATCH_SORT, the sorting scope is smaller and more index tree will be created,
@@ -1120,8 +1195,10 @@ public final class CarbonCommonConstants {
    * Size of batch data to keep in memory, as a thumb rule it supposed
    * to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk
    */
+  @CarbonProperty
   public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";
-
+  public static final String LOAD_BATCH_SORT_SIZE_INMB_DEFAULT = "0";
+  @CarbonProperty
   /**
    * The Number of partitions to use when shuffling data for sort. If user don't configurate or
    * configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we
@@ -1130,7 +1207,7 @@ public final class CarbonCommonConstants {
   public static final String LOAD_GLOBAL_SORT_PARTITIONS = "carbon.load.global.sort.partitions";
 
   public static final String LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT = "0";
-
+  @CarbonProperty
   public static final String ENABLE_VECTOR_READER = "carbon.enable.vector.reader";
 
   public static final String ENABLE_VECTOR_READER_DEFAULT = "true";
@@ -1138,6 +1215,7 @@ public final class CarbonCommonConstants {
   /*
    * carbon dictionary server port
    */
+  @CarbonProperty
   public static final String DICTIONARY_SERVER_PORT = "carbon.dictionary.server.port";
 
   /**
@@ -1148,6 +1226,7 @@ public final class CarbonCommonConstants {
   /**
    * property to set is IS_DRIVER_INSTANCE
    */
+  @CarbonProperty
   public static final String IS_DRIVER_INSTANCE = "is.driver.instance";
 
   /**
@@ -1158,6 +1237,7 @@ public final class CarbonCommonConstants {
   /**
    * property for enabling unsafe based query processing
    */
+  @CarbonProperty
   public static final String ENABLE_UNSAFE_IN_QUERY_EXECUTION = "enable.unsafe.in.query.processing";
 
   /**
@@ -1168,6 +1248,7 @@ public final class CarbonCommonConstants {
   /**
    * property for offheap based processing
    */
+  @CarbonProperty
   public static final String USE_OFFHEAP_IN_QUERY_PROCSSING = "use.offheap.in.query.processing";
 
   /**
@@ -1178,6 +1259,7 @@ public final class CarbonCommonConstants {
   /**
    * whether to prefetch data while loading.
    */
+  @CarbonProperty
   public static final String USE_PREFETCH_WHILE_LOADING = "carbon.loading.prefetch";
 
   /**
@@ -1190,17 +1272,17 @@ public final class CarbonCommonConstants {
   public static final String MAJOR = "major";
 
   public static final String LOCAL_FILE_PREFIX = "file://";
-
+  @CarbonProperty
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
 
   public static final int DICTIONARY_DEFAULT_CARDINALITY = 1;
-
+  @CarbonProperty
   public static final String SPARK_SCHEMA_STRING_LENGTH_THRESHOLD =
       "spark.sql.sources.schemaStringLengthThreshold";
 
   public static final int SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT = 4000;
-
+  @CarbonProperty
   public static final String CARBON_BAD_RECORDS_ACTION = "carbon.bad.records.action";
 
   public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
new file mode 100644
index 0000000..ed481bb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.constants;
+
+import org.apache.carbondata.core.util.CarbonProperty;
+
+/**
+ * Load options constant
+ */
+public final class CarbonLoadOptionConstants {
+  /**
+   * option to enable and disable the logger
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE =
+      "carbon.options.bad.records.logger.enable";
+
+  public static String CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT = "false";
+  /**
+   * property to pass the bad records action
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BAD_RECORDS_ACTION =
+      "carbon.options.bad.records.action";
+  /**
+   * load option to specify weather empty data to be treated as bad record
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD =
+      "carbon.options.is.empty.data.bad.record";
+  public static final String CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT = "false";
+
+  /**
+   * option to specify the load option
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_DATEFORMAT =
+      "carbon.options.dateformat";
+  public static final String CARBON_OPTIONS_DATEFORMAT_DEFAULT = "";
+  /**
+   * option to specify the sort_scope
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_SORT_SCOPE =
+      "carbon.options.sort.scope";
+  /**
+   * option to specify the batch sort size inmb
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BATCH_SORT_SIZE_INMB =
+      "carbon.options.batch.sort.size.inmb";
+  /**
+   * Option to enable/ disable single_pass
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_SINGLE_PASS =
+      "carbon.options.single.pass";
+  public static final String CARBON_OPTIONS_SINGLE_PASS_DEFAULT = "false";
+
+  /**
+   * specify bad record path option
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BAD_RECORD_PATH =
+      "carbon.options.bad.record.path";
+  /**
+   * specify bad record path option
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS =
+      "carbon.options.global.sort.partitions";
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
index 0ce73f0..edc7b9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.constants;
 
+import org.apache.carbondata.core.util.CarbonProperty;
+
 /**
  * Constants for V3 data format
  */
@@ -24,6 +26,7 @@ public interface CarbonV3DataFormatConstants {
   /**
    * each blocklet group size in mb
    */
+  @CarbonProperty
   String BLOCKLET_SIZE_IN_MB = "carbon.blockletgroup.size.in.mb";
 
   /**
@@ -39,6 +42,7 @@ public interface CarbonV3DataFormatConstants {
   /**
    * number of column to be read in one IO in query
    */
+  @CarbonProperty
   String NUMBER_OF_COLUMN_TO_READ_IN_IO = "number.of.column.to.read.in.io";
 
   /**
@@ -59,6 +63,7 @@ public interface CarbonV3DataFormatConstants {
   /**
    * number of rows per blocklet column page
    */
+  @CarbonProperty
   String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE = "number.of.rows.per.blocklet.column.page";
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java b/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java
new file mode 100644
index 0000000..bef9576
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.exception;
+
+import java.util.Locale;
+
+public class InvalidConfigurationException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public InvalidConfigurationException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public InvalidConfigurationException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public InvalidConfigurationException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 0142e38..c1e70ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -21,13 +21,15 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.lang.reflect.Field;
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
@@ -48,10 +50,7 @@ public final class CarbonProperties {
    */
   private Properties carbonProperties;
 
-  /**
-   * Added properties on the fly.
-   */
-  private Map<String, String> setProperties = new HashMap<>();
+  private Set<String> propertySet = new HashSet<String>();
 
   /**
    * Private constructor this will call load properties method to load all the
@@ -77,6 +76,11 @@ public final class CarbonProperties {
    * values in case of wrong values.
    */
   private void validateAndLoadDefaultProperties() {
+    try {
+      initPropertySet();
+    } catch (IllegalAccessException e) {
+      LOGGER.error("Illelagal access to declared field" + e.getMessage());
+    }
     if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
       carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
           CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
@@ -86,7 +90,6 @@ public final class CarbonProperties {
     validateNumCores();
     validateNumCoresBlockSort();
     validateSortSize();
-    validateBadRecordsLocation();
     validateHighCardinalityIdentify();
     validateHighCardinalityThreshold();
     validateCarbonDataFileVersion();
@@ -97,6 +100,27 @@ public final class CarbonProperties {
     validateNumberOfRowsPerBlockletColumnPage();
   }
 
+  private void initPropertySet() throws IllegalAccessException {
+    Field[] declaredFields = CarbonCommonConstants.class.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (field.isAnnotationPresent(CarbonProperty.class)) {
+        propertySet.add(field.get(field.getName()).toString());
+      }
+    }
+    declaredFields = CarbonV3DataFormatConstants.class.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (field.isAnnotationPresent(CarbonProperty.class)) {
+        propertySet.add(field.get(field.getName()).toString());
+      }
+    }
+    declaredFields = CarbonLoadOptionConstants.class.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (field.isAnnotationPresent(CarbonProperty.class)) {
+        propertySet.add(field.get(field.getName()).toString());
+      }
+    }
+  }
+
   private void validatePrefetchBufferSize() {
     String prefetchBufferSizeStr =
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
@@ -202,15 +226,6 @@ public final class CarbonProperties {
     }
   }
 
-  private void validateBadRecordsLocation() {
-    String badRecordsLocation =
-        carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    if (null == badRecordsLocation || badRecordsLocation.length() == 0) {
-      carbonProperties.setProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
-    }
-  }
-
   /**
    * This method validates the blocklet size
    */
@@ -425,6 +440,12 @@ public final class CarbonProperties {
    * @return properties value
    */
   public String getProperty(String key) {
+    // get the property value from session parameters,
+    // if its null then get value from carbonProperties
+    String sessionPropertyValue = getSessionPropertyValue(key);
+    if (null != sessionPropertyValue) {
+      return sessionPropertyValue;
+    }
     //TODO temporary fix
     if ("carbon.leaf.node.size".equals(key)) {
       return "120000";
@@ -433,6 +454,25 @@ public final class CarbonProperties {
   }
 
   /**
+   * returns session property value
+   *
+   * @param key
+   * @return
+   */
+  private String getSessionPropertyValue(String key) {
+    String value = null;
+    CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
+    if (null != carbonSessionInfo) {
+      SessionParams sessionParams =
+          ThreadLocalSessionInfo.getCarbonSessionInfo().getSessionParams();
+      if (null != sessionParams) {
+        value = sessionParams.getProperty(key);
+      }
+    }
+    return value;
+  }
+
+  /**
    * This method will be used to get the properties value if property is not
    * present then it will return tghe default value
    *
@@ -454,26 +494,10 @@ public final class CarbonProperties {
    * @return properties value
    */
   public CarbonProperties addProperty(String key, String value) {
-    setProperties.put(key, value);
     carbonProperties.setProperty(key, value);
     return this;
   }
 
-  /**
-   * Get all the added properties.
-   * @return
-   */
-  public Map<String, String> getAddedProperies() {
-    return setProperties;
-  }
-
-  public void setProperties(Map<String, String> newProperties) {
-    setProperties.putAll(newProperties);
-    for (Map.Entry<String, String> entry : newProperties.entrySet()) {
-      carbonProperties.setProperty(entry.getKey(), entry.getValue());
-    }
-  }
-
   private ColumnarFormatVersion getDefaultFormatVersion() {
     return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
   }
@@ -748,4 +772,13 @@ public final class CarbonProperties {
     }
     return numberOfDeltaFilesThreshold;
   }
+
+  /**
+   * returns true if carbon property
+   * @param key
+   * @return
+   */
+  public boolean isCarbonProperty(String key) {
+    return propertySet.contains(key);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java
new file mode 100644
index 0000000..2970a89
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * CarbonProperty Anotation
+ */
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CarbonProperty {
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
new file mode 100644
index 0000000..1a82f1d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+
+/**
+ * This class maintains carbon session information details
+ */
+public class CarbonSessionInfo implements Serializable {
+
+  // contains carbon session param details
+  private SessionParams sessionParams;
+
+  public SessionParams getSessionParams() {
+    return sessionParams;
+  }
+
+  public void setSessionParams(SessionParams sessionParams) {
+    this.sessionParams = sessionParams;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 200d5ca..f409551 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -326,10 +327,13 @@ public final class CarbonUtil {
   }
 
   public static String getBadLogPath(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = CarbonProperties.getInstance()
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
     return badLogStoreLocation;
   }
 
@@ -1647,5 +1651,69 @@ public final class CarbonUtil {
         throw new IllegalArgumentException("Int cannot me more than 4 bytes");
     }
   }
+  /**
+   * Validate boolean value configuration
+   *
+   * @param value
+   * @return
+   */
+  public static boolean validateBoolean(String value) {
+    if (null == value) {
+      return false;
+    } else if (!("false".equalsIgnoreCase(value) || "true".equalsIgnoreCase(value))) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * validate the sort scope
+   * @param sortScope
+   * @return
+   */
+  public static boolean isValidSortOption(String sortScope) {
+    if (sortScope == null) {
+      return false;
+    }
+    switch (sortScope.toUpperCase()) {
+      case "BATCH_SORT":
+        return true;
+      case "LOCAL_SORT":
+        return true;
+      case "NO_SORT":
+        return true;
+      case "GLOBAL_SORT":
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * validate teh batch size
+   *
+   * @param value
+   * @return
+   */
+  public static boolean validateValidIntType(String value) {
+    if (null == value) {
+      return false;
+    }
+    try {
+      Integer.parseInt(value);
+    } catch (NumberFormatException nfe) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * is valid store path
+   * @param badRecordsLocation
+   * @return
+   */
+  public static boolean isValidBadStorePath(String badRecordsLocation) {
+    return !(null == badRecordsLocation || badRecordsLocation.length() == 0);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 781b898..f06ba01 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -1,26 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.carbondata.core.util;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.*;
+
 /**
- * Created by root1 on 19/5/17.
+ * This class maintains carbon session params
  */
 public class SessionParams implements Serializable {
 
-  protected transient CarbonProperties properties;
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CacheProvider.class.getName());
 
   private Map<String, String> sProps;
 
   public SessionParams() {
     sProps = new HashMap<>();
-    properties = CarbonProperties.getInstance();
-  }
-
-  public SessionParams(SessionParams sessionParams) {
-    this();
-    sProps.putAll(sessionParams.sProps);
   }
 
   /**
@@ -30,41 +52,90 @@ public class SessionParams implements Serializable {
    * @return properties value
    */
   public String getProperty(String key) {
-    String s = sProps.get(key);
-    if (key == null) {
-      s = properties.getProperty(key);
-    }
-    return s;
+    return sProps.get(key);
   }
 
   /**
-   * This method will be used to get the properties value if property is not
-   * present then it will return tghe default value
+   * This method will be used to add a new property
    *
    * @param key
    * @return properties value
    */
-  public String getProperty(String key, String defaultValue) {
-    String value = sProps.get(key);
-    if (key == null) {
-      value = properties.getProperty(key, defaultValue);
+  public SessionParams addProperty(String key, String value) throws InvalidConfigurationException {
+    boolean isValidConf = validateKeyValue(key, value);
+    if (isValidConf) {
+      LOGGER.audit("The key " + key + " with value " + value + " added in the session param");
+      sProps.put(key, value);
     }
-    return value;
+    return this;
   }
 
   /**
-   * This method will be used to add a new property
-   *
+   * validate the key value to be set using set command
    * @param key
-   * @return properties value
+   * @param value
+   * @return
+   * @throws InvalidConfigurationException
    */
-  public SessionParams addProperty(String key, String value) {
-    sProps.put(key, value);
-    return this;
+  private boolean validateKeyValue(String key, String value) throws InvalidConfigurationException {
+    boolean isValid = false;
+    switch (key) {
+      case ENABLE_UNSAFE_SORT:
+      case CARBON_CUSTOM_BLOCK_DISTRIBUTION:
+      case CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE:
+      case CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD:
+      case CARBON_OPTIONS_SINGLE_PASS:
+        isValid = CarbonUtil.validateBoolean(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
+        }
+        break;
+      case CARBON_OPTIONS_BAD_RECORDS_ACTION:
+        try {
+          LoggerAction.valueOf(value.toUpperCase());
+          isValid = true;
+        } catch (IllegalArgumentException iae) {
+          throw new InvalidConfigurationException(
+              "The key " + key + " can have only either FORCE or IGNORE or REDIRECT.");
+        }
+        break;
+      case CARBON_OPTIONS_SORT_SCOPE:
+        isValid = CarbonUtil.isValidSortOption(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException("The sort scope " + key
+              + " can have only either BATCH_SORT or LOCAL_SORT or NO_SORT.");
+        }
+        break;
+      case CARBON_OPTIONS_BATCH_SORT_SIZE_INMB:
+      case CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS:
+        isValid = CarbonUtil.validateValidIntType(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException(
+              "The configured value for key " + key + " must be valid integer.");
+        }
+        break;
+      case CARBON_OPTIONS_BAD_RECORD_PATH:
+        isValid = CarbonUtil.isValidBadStorePath(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException("Invalid bad records location.");
+        }
+        break;
+      // no validation needed while set for CARBON_OPTIONS_DATEFORMAT
+      case CARBON_OPTIONS_DATEFORMAT:
+        isValid = true;
+        break;
+      default:
+        throw new InvalidConfigurationException(
+            "The key " + key + " not supported for dynamic configuration.");
+    }
+    return isValid;
   }
 
-  public void setProperties(Map<String, String> newProperties) {
-    sProps.putAll(newProperties);
+  /**
+   * clear the set properties
+   */
+  public void clear() {
+    sProps.clear();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
new file mode 100644
index 0000000..df525bc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+/**
+ * This class maintains ThreadLocal session params
+ */
+public class ThreadLocalSessionInfo {
+  static final InheritableThreadLocal<CarbonSessionInfo> threadLocal =
+      new InheritableThreadLocal<CarbonSessionInfo>();
+
+  public static void setCarbonSessionInfo(CarbonSessionInfo carbonSessionInfo) {
+    threadLocal.set(carbonSessionInfo);
+  }
+
+  public static CarbonSessionInfo getCarbonSessionInfo() {
+    return threadLocal.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
deleted file mode 100644
index 354a0ee..0000000
--- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.util;
-
-/**
- * This class maintains ThreadLocal session params
- */
-public class ThreadLocalSessionParams {
-  static final InheritableThreadLocal<SessionParams> threadLocal =
-      new InheritableThreadLocal<SessionParams>();
-
-  public static void setSessionParams(SessionParams sessionParams) {
-    threadLocal.set(sessionParams);
-  }
-
-  public static SessionParams getSessionParams() {
-    return threadLocal.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 6e6f2bd..9aa1188 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -23,8 +23,10 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.CarbonProjection;
@@ -51,7 +53,10 @@ public class CarbonInputMapperTest extends TestCase {
 
   // changed setUp to static init block to avoid un wanted multiple time store creation
   static {
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
+
   }
 
   @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
index 60fee95..bf347c5 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
@@ -23,6 +23,9 @@ import java.util.List;
 import java.util.UUID;
 
 import junit.framework.TestCase;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.test.util.StoreCreator;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +40,8 @@ import org.junit.Test;
 public class InputFilesTest extends TestCase {
   @Before
   public void setUp() throws Exception {
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
     // waiting 3s to finish table create and data loading
     Thread.sleep(3000L);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
deleted file mode 100644
index 28e2dbf..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.commands
-
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.util.CarbonProperties
-
-class SetCommandTestCase  extends QueryTest with BeforeAndAfterAll {
-
-  test("test set command") {
-
-    sql("set key1=value1")
-
-    assert(CarbonProperties.getInstance().getProperty("key1").equals("value1"), "Set command does not work" )
-    assert(sqlContext.getConf("key1").equals("value1"), "Set command does not work" )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 2842a16..3f5be84 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -338,9 +338,9 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
       .addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
         CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
 
-    sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = ${CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT}")
-    sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = " +
-      s"${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT}")
+    // sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = ${CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT}")
+    // sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = " +
+    //  s"${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT}")
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index 6fb11b3..4ccd49e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -71,12 +71,12 @@ class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterA
     try {
       sql(s"""
            LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData1.csv' into table t3
-           OPTIONS('dateformat' = '')
+           OPTIONS('dateformat' = 'date')
            """)
       assert(false)
     } catch {
       case ex: MalformedCarbonCommandException =>
-        assertResult(ex.getMessage)("Error: Option DateFormat is set an empty string.")
+        assertResult(ex.getMessage)("Error: Option DateFormat is not provided for Column date.")
       case _: Throwable=> assert(false)
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index ae951bd..f2a4a7d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -28,11 +28,8 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 object ValidateUtil {
   def validateDateFormat(dateFormat: String, table: CarbonTable, tableName: String): Unit = {
     val dimensions = table.getDimensionByTableName(tableName).asScala
-    if (dateFormat != null) {
-      if (dateFormat.trim == "") {
-        throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
-          "string.")
-      } else {
+    // allowing empty value to be configured for dateformat option.
+    if (dateFormat != null && dateFormat.trim != "") {
         val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
         for (singleDateFormat <- dateFormats) {
           val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
@@ -49,7 +46,6 @@ object ValidateUtil {
           }
         }
       }
-    }
   }
 
   def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index e00dd0f..106a9fd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
-import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 
 /**
  * This RDD maintains session level ThreadLocal
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
 abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
 
-  val sessionParams: SessionParams = ThreadLocalSessionParams.getSessionParams
+  val carbonSessionInfo: CarbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient oneParent: RDD[_]) =
@@ -40,7 +40,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
   def internalCompute(split: Partition, context: TaskContext): Iterator[T]
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     internalCompute(split, context)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 5e37f63..383d308 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
@@ -827,7 +827,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT",
+      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
       "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB",
       "GLOBAL_SORT_PARTITIONS"
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index a01ccb2..b76bca3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -54,6 +54,7 @@ object TestQueryExecutor {
   CarbonProperties.getInstance()
     .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
     .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+    .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords")
   private def lookupQueryExecutor: Class[_] = {
     ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)
       .iterator().next().getClass

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 65235e6..3579b8a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel,
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
@@ -53,7 +54,6 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d085ad7..ba22c3c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -412,7 +413,16 @@ case class LoadTable(
       val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
       val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
-
+      val bad_record_path = options.getOrElse("bad_record_path",
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+      if (badRecordsLoggerEnable.toBoolean ||
+          LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
+        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+          sys.error("Invalid bad records location.")
+        }
+      }
+      carbonLoadModel.setBadRecordsLocation(bad_record_path)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
@@ -730,13 +740,6 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
         }
-        // delete bad record log after drop table
-        val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
-        val badLogFileType = FileFactory.getFileType(badLogPath)
-        if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
-          val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
-          CarbonUtil.deleteFoldersAndFiles(file)
-        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 48af516..5c20808 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel,
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
@@ -53,7 +54,6 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 7c096d3..d28044f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.processing.merger.TableMeta
@@ -53,8 +53,8 @@ case class CarbonDatasourceHadoopRelation(
       absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
     .asInstanceOf[CarbonRelation]
 
-  val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
-  ThreadLocalSessionParams.setSessionParams(sessionParams)
+  val carbonSessionInfo : CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+  ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 78820ea..925b82b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -38,6 +38,8 @@ class CarbonEnv {
 
   var sessionParams: SessionParams = _
 
+  var carbonSessionInfo: CarbonSessionInfo = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -48,8 +50,10 @@ class CarbonEnv {
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
+      carbonSessionInfo = new CarbonSessionInfo()
       sessionParams = new SessionParams()
-      ThreadLocalSessionParams.setSessionParams(sessionParams)
+      carbonSessionInfo.setSessionParams(sessionParams)
+      ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
       val config = new CarbonSQLConf(sparkSession)
       if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
         config.addDefaultCarbonParams()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index a4feead..d2022be 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -49,7 +49,7 @@ case class CarbonSetCommand(command: SetCommand)
   override val output = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val sessionParms = CarbonEnv.getInstance(sparkSession).sessionParams
+    val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
     command.kv match {
       case Some((key, Some(value))) =>
         val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
@@ -68,7 +68,7 @@ case class CarbonResetCommand()
   override val output = ResetCommand.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    CarbonEnv.getInstance(sparkSession).sessionParams.clear()
+    CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams.clear()
     ResetCommand.run(sparkSession)
   }
 }