You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/28 16:41:04 UTC

[07/20] carbondata git commit: 1. Refactored the bad record code, by default the bad record path will be empty, if bad record logger is enabled or action is redirect and bad record path is not configured then data-load will fail. 2. Support dynamic se

1. Refactored the bad record code, by default the bad record path will be empty, if bad record logger is    enabled or action is redirect and bad record path is not configured then data-load will fail. 2. Support dynamic set command for some of load options 3. fixed test cases 4. Added validation for the supported property in the dynamic set command 5. Change table delete behavior // now the bad record would not be deleted ion table drop 6. added test case for bad record path in load option 7. fixed failing test cases 8. Added "carbon.options." in load options parameters


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/39644b5e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/39644b5e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/39644b5e

Branch: refs/heads/encoding_override
Commit: 39644b5e003bddf89c80ad539506b4a29b04b526
Parents: 95ce1da
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Mon Jun 12 18:33:22 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Tue Jun 27 16:10:28 2017 +0530

----------------------------------------------------------------------
 .../common/constants/LoggerAction.java          |  38 +++++
 .../core/constants/CarbonCommonConstants.java   | 100 ++++++++++-
 .../constants/CarbonLoadOptionConstants.java    |  88 ++++++++++
 .../constants/CarbonV3DataFormatConstants.java  |   5 +
 .../InvalidConfigurationException.java          |  87 ++++++++++
 .../carbondata/core/util/CarbonProperties.java  |  97 +++++++----
 .../carbondata/core/util/CarbonProperty.java    |  28 ++++
 .../carbondata/core/util/CarbonSessionInfo.java |  38 +++++
 .../apache/carbondata/core/util/CarbonUtil.java |  74 ++++++++-
 .../carbondata/core/util/SessionParams.java     | 127 ++++++++++----
 .../core/util/ThreadLocalSessionInfo.java       |  34 ++++
 .../core/util/ThreadLocalSessionParams.java     |  34 ----
 .../hadoop/ft/CarbonInputMapperTest.java        |   5 +
 .../carbondata/hadoop/ft/InputFilesTest.java    |   5 +
 .../testsuite/commands/SetCommandTestCase.scala |  34 ----
 .../dataload/TestGlobalSortDataLoad.scala       |   6 +-
 .../TestLoadDataWithDiffTimestampFormat.scala   |   4 +-
 .../carbondata/spark/load/ValidateUtil.scala    |   8 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala |   6 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   4 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   1 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../execution/command/carbonTableSchema.scala   |  19 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   6 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   8 +-
 .../execution/command/CarbonHiveCommands.scala  |   4 +-
 .../execution/command/carbonTableSchema.scala   |  68 +++++---
 .../spark/sql/internal/CarbonSqlConf.scala      | 144 ++++++++++++++++
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   6 +-
 .../BadRecordPathLoadOptionTest.scala           |  87 ++++++++++
 .../DataLoadFailAllTypeSortTest.scala           |   1 -
 .../commands/SetCommandTestCase.scala           | 165 +++++++++++++++++++
 .../processing/constants/LoggerAction.java      |  38 -----
 .../processing/model/CarbonLoadModel.java       |  14 ++
 .../newflow/DataLoadProcessBuilder.java         |   3 +
 .../newflow/sort/SortScopeOptions.java          |  17 +-
 .../steps/DataConverterProcessorStepImpl.java   |  25 ++-
 ...ConverterProcessorWithBucketingStepImpl.java |  23 ++-
 .../util/CarbonDataProcessorUtil.java           |  16 +-
 .../carbon/datastore/BlockIndexStoreTest.java   |   3 +-
 41 files changed, 1205 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java b/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java
new file mode 100644
index 0000000..9c027fe
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/constants/LoggerAction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.constants;
+
+/**
+ * enum to hold the bad record logger action
+ */
+public enum LoggerAction {
+
+  FORCE("FORCE"), // data will be converted to null
+  REDIRECT("REDIRECT"), // no null conversion moved to bad record and written to raw csv
+  IGNORE("IGNORE"), // no null conversion moved to bad record and not written to raw csv
+  FAIL("FAIL");  //data loading will fail if a bad record is found
+  private String name;
+
+  LoggerAction(String name) {
+    this.name = name;
+  }
+
+  @Override public String toString() {
+    return this.name;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index a9b2eb7..208bab8 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -19,6 +19,8 @@ package org.apache.carbondata.core.constants;
 
 import java.nio.charset.Charset;
 
+import org.apache.carbondata.core.util.CarbonProperty;
+
 public final class CarbonCommonConstants {
   /**
    * integer size in bytes
@@ -51,18 +53,22 @@ public final class CarbonCommonConstants {
   /**
    * location of the carbon member, hierarchy and fact files
    */
+  @CarbonProperty
   public static final String STORE_LOCATION = "carbon.storelocation";
   /**
    * blocklet size in carbon file
    */
+  @CarbonProperty
   public static final String BLOCKLET_SIZE = "carbon.blocklet.size";
   /**
    * Number of cores to be used
    */
+  @CarbonProperty
   public static final String NUM_CORES = "carbon.number.of.cores";
   /**
    * carbon sort size
    */
+  @CarbonProperty
   public static final String SORT_SIZE = "carbon.sort.size";
   /**
    * default location of the carbon member, hierarchy and fact files
@@ -123,6 +129,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_DDL_BASE_HDFS_URL
    */
+  @CarbonProperty
   public static final String CARBON_DDL_BASE_HDFS_URL = "carbon.ddl.base.hdfs.url";
   /**
    * Load Folder Name
@@ -139,6 +146,7 @@ public final class CarbonCommonConstants {
   /**
    * FS_DEFAULT_FS
    */
+  @CarbonProperty
   public static final String FS_DEFAULT_FS = "fs.defaultFS";
   /**
    * BYTEBUFFER_SIZE
@@ -182,11 +190,12 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_BADRECORDS_LOCATION
    */
+  @CarbonProperty
   public static final String CARBON_BADRECORDS_LOC = "carbon.badRecords.location";
   /**
    * CARBON_BADRECORDS_LOCATION_DEFAULT
    */
-  public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL = "/tmp/carbon/badRecords";
+  public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL = "";
   /**
    * HIERARCHY_FILE_EXTENSION
    */
@@ -220,6 +229,7 @@ public final class CarbonCommonConstants {
   /**
    * GRAPH_ROWSET_SIZE
    */
+  @CarbonProperty
   public static final String GRAPH_ROWSET_SIZE = "carbon.graph.rowset.size";
   /**
    * GRAPH_ROWSET_SIZE_DEFAULT
@@ -244,6 +254,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_INTERMEDIATE_FILES_LIMIT
    */
+  @CarbonProperty
   public static final String SORT_INTERMEDIATE_FILES_LIMIT = "carbon.sort.intermediate.files.limit";
   /**
    * SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE
@@ -260,10 +271,12 @@ public final class CarbonCommonConstants {
   /**
    * SORT_FILE_BUFFER_SIZE
    */
+  @CarbonProperty
   public static final String SORT_FILE_BUFFER_SIZE = "carbon.sort.file.buffer.size";
   /**
    * no.of records after which counter to be printed
    */
+  @CarbonProperty
   public static final String DATA_LOAD_LOG_COUNTER = "carbon.load.log.counter";
   /**
    * DATA_LOAD_LOG_COUNTER_DEFAULT_COUNTER
@@ -272,6 +285,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_FILE_WRITE_BUFFER_SIZE
    */
+  @CarbonProperty
   public static final String CARBON_SORT_FILE_WRITE_BUFFER_SIZE =
       "carbon.sort.file.write.buffer.size";
   /**
@@ -281,14 +295,17 @@ public final class CarbonCommonConstants {
   /**
    * Number of cores to be used while loading
    */
+  @CarbonProperty
   public static final String NUM_CORES_LOADING = "carbon.number.of.cores.while.loading";
   /**
    * Number of cores to be used while compacting
    */
+  @CarbonProperty
   public static final String NUM_CORES_COMPACTING = "carbon.number.of.cores.while.compacting";
   /**
    * Number of cores to be used for block sort
    */
+  @CarbonProperty
   public static final String NUM_CORES_BLOCK_SORT = "carbon.number.of.cores.block.sort";
   /**
    * Default value of number of cores to be used for block sort
@@ -305,6 +322,7 @@ public final class CarbonCommonConstants {
   /**
    * CSV_READ_BUFFER_SIZE
    */
+  @CarbonProperty
   public static final String CSV_READ_BUFFER_SIZE = "carbon.csv.read.buffersize.byte";
   /**
    * CSV_READ_BUFFER_SIZE
@@ -355,6 +373,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_MERGE_SORT_READER_THREAD
    */
+  @CarbonProperty
   public static final String CARBON_MERGE_SORT_READER_THREAD = "carbon.merge.sort.reader.thread";
   /**
    * CARBON_MERGE_SORT_READER_THREAD_DEFAULTVALUE
@@ -363,6 +382,7 @@ public final class CarbonCommonConstants {
   /**
    * IS_SORT_TEMP_FILE_COMPRESSION_ENABLED
    */
+  @CarbonProperty
   public static final String IS_SORT_TEMP_FILE_COMPRESSION_ENABLED =
       "carbon.is.sort.temp.file.compression.enabled";
   /**
@@ -372,6 +392,7 @@ public final class CarbonCommonConstants {
   /**
    * SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION
    */
+  @CarbonProperty
   public static final String SORT_TEMP_FILE_NO_OF_RECORDS_FOR_COMPRESSION =
       "carbon.sort.temp.file.no.of.records.for.compression";
   /**
@@ -390,6 +411,7 @@ public final class CarbonCommonConstants {
    * Property for specifying the format of TIMESTAMP data type column.
    * e.g. yyyy/MM/dd HH:mm:ss, or using CARBON_TIMESTAMP_DEFAULT_FORMAT
    */
+  @CarbonProperty
   public static final String CARBON_TIMESTAMP_FORMAT = "carbon.timestamp.format";
 
   /**
@@ -400,14 +422,17 @@ public final class CarbonCommonConstants {
    * Property for specifying the format of DATE data type column.
    * e.g. yyyy/MM/dd , or using CARBON_DATE_DEFAULT_FORMAT
    */
+  @CarbonProperty
   public static final String CARBON_DATE_FORMAT = "carbon.date.format";
   /**
    * STORE_LOCATION_HDFS
    */
+  @CarbonProperty
   public static final String STORE_LOCATION_HDFS = "carbon.storelocation.hdfs";
   /**
    * STORE_LOCATION_TEMP_PATH
    */
+  @CarbonProperty
   public static final String STORE_LOCATION_TEMP_PATH = "carbon.tempstore.location";
   /**
    * IS_COLUMNAR_STORAGE_DEFAULTVALUE
@@ -424,6 +449,7 @@ public final class CarbonCommonConstants {
   /**
    * IS_INT_BASED_INDEXER
    */
+  @CarbonProperty
   public static final String AGGREAGATE_COLUMNAR_KEY_BLOCK = "aggregate.columnar.keyblock";
   /**
    * IS_INT_BASED_INDEXER_DEFAULTVALUE
@@ -432,6 +458,7 @@ public final class CarbonCommonConstants {
   /**
    * ENABLE_QUERY_STATISTICS
    */
+  @CarbonProperty
   public static final String ENABLE_QUERY_STATISTICS = "enable.query.statistics";
   /**
    * ENABLE_QUERY_STATISTICS_DEFAULT
@@ -440,6 +467,7 @@ public final class CarbonCommonConstants {
   /**
    * TIME_STAT_UTIL_TYPE
    */
+  @CarbonProperty
   public static final String ENABLE_DATA_LOADING_STATISTICS = "enable.data.loading.statistics";
   /**
    * TIME_STAT_UTIL_TYPE_DEFAULT
@@ -448,6 +476,7 @@ public final class CarbonCommonConstants {
   /**
    * IS_INT_BASED_INDEXER
    */
+  @CarbonProperty
   public static final String HIGH_CARDINALITY_VALUE = "high.cardinality.value";
   /**
    * IS_INT_BASED_INDEXER_DEFAULTVALUE
@@ -508,6 +537,7 @@ public final class CarbonCommonConstants {
   /**
    * MAX_QUERY_EXECUTION_TIME
    */
+  @CarbonProperty
   public static final String MAX_QUERY_EXECUTION_TIME = "max.query.execution.time";
   /**
    * CARBON_TIMESTAMP
@@ -529,17 +559,20 @@ public final class CarbonCommonConstants {
   /**
    * NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
    */
+  @CarbonProperty
   public static final String NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK =
       "carbon.load.metadata.lock.retries";
   /**
    * MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK
    */
+  @CarbonProperty
   public static final String MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK =
       "carbon.load.metadata.lock.retry.timeout.sec";
 
   /**
    * compressor for writing/reading carbondata file
    */
+  @CarbonProperty
   public static final String COMPRESSOR = "carbon.column.compressor";
 
   /**
@@ -596,6 +629,7 @@ public final class CarbonCommonConstants {
   /**
    * The batch size of records which returns to client.
    */
+  @CarbonProperty
   public static final String DETAIL_QUERY_BATCH_SIZE = "carbon.detail.batch.size";
 
   public static final int DETAIL_QUERY_BATCH_SIZE_DEFAULT = 100;
@@ -609,6 +643,7 @@ public final class CarbonCommonConstants {
   /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
+  @CarbonProperty
   public static final String CARBON_MAX_DRIVER_LRU_CACHE_SIZE = "carbon.max.driver.lru.cache.size";
   public static final String POSITION_REFERENCE = "positionReference";
   /**
@@ -618,10 +653,12 @@ public final class CarbonCommonConstants {
   /**
    * max driver lru cache size upto which lru cache will be loaded in memory
    */
+  @CarbonProperty
   public static final String CARBON_MAX_LEVEL_CACHE_SIZE = "carbon.max.level.cache.size";
   /**
    * max executor lru cache size upto which lru cache will be loaded in memory
    */
+  @CarbonProperty
   public static final String CARBON_MAX_EXECUTOR_LRU_CACHE_SIZE =
       "carbon.max.executor.lru.cache.size";
   /**
@@ -649,6 +686,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_PREFETCH_BUFFERSIZE
    */
+  @CarbonProperty
   public static final String CARBON_PREFETCH_BUFFERSIZE = "carbon.prefetch.buffersize";
   /**
    * CARBON_PREFETCH_BUFFERSIZE DEFAULT VALUE
@@ -665,6 +703,7 @@ public final class CarbonCommonConstants {
   /**
    * ENABLE_AUTO_LOAD_MERGE
    */
+  @CarbonProperty
   public static final String ENABLE_AUTO_LOAD_MERGE = "carbon.enable.auto.load.merge";
   /**
    * DEFAULT_ENABLE_AUTO_LOAD_MERGE
@@ -675,6 +714,7 @@ public final class CarbonCommonConstants {
    * ZOOKEEPER_ENABLE_LOCK if this is set to true then zookeeper will be used to handle locking
    * mechanism of carbon
    */
+  @CarbonProperty
   public static final String LOCK_TYPE = "carbon.lock.type";
 
   /**
@@ -691,11 +731,13 @@ public final class CarbonCommonConstants {
   /**
    * maximum dictionary chunk size that can be kept in memory while writing dictionary file
    */
+  @CarbonProperty
   public static final String DICTIONARY_ONE_CHUNK_SIZE = "carbon.dictionary.chunk.size";
 
   /**
    *  Dictionary Server Worker Threads
    */
+  @CarbonProperty
   public static final String DICTIONARY_WORKER_THREADS = "dictionary.worker.threads";
 
   /**
@@ -711,6 +753,7 @@ public final class CarbonCommonConstants {
   /**
    * xxhash algorithm property for hashmap
    */
+  @CarbonProperty
   public static final String ENABLE_XXHASH = "carbon.enableXXHash";
 
   /**
@@ -744,6 +787,7 @@ public final class CarbonCommonConstants {
   /**
    * Size of Major Compaction in MBs
    */
+  @CarbonProperty
   public static final String MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
 
   /**
@@ -754,6 +798,7 @@ public final class CarbonCommonConstants {
   /**
    * This property is used to tell how many segments to be preserved from merging.
    */
+  @CarbonProperty
   public static final java.lang.String PRESERVE_LATEST_SEGMENTS_NUMBER =
       "carbon.numberof.preserve.segments";
 
@@ -765,6 +810,7 @@ public final class CarbonCommonConstants {
   /**
    * This property will determine the loads of how many days can be compacted.
    */
+  @CarbonProperty
   public static final java.lang.String DAYS_ALLOWED_TO_COMPACT = "carbon.allowed.compaction.days";
 
   /**
@@ -775,6 +821,7 @@ public final class CarbonCommonConstants {
   /**
    * space reserved for writing block meta data in carbon data file
    */
+  @CarbonProperty
   public static final String CARBON_BLOCK_META_RESERVED_SPACE =
       "carbon.block.meta.size.reserved.percentage";
 
@@ -786,6 +833,7 @@ public final class CarbonCommonConstants {
   /**
    * property to enable min max during filter query
    */
+  @CarbonProperty
   public static final String CARBON_QUERY_MIN_MAX_ENABLED = "carbon.enableMinMax";
 
   /**
@@ -797,6 +845,7 @@ public final class CarbonCommonConstants {
    * this variable is to enable/disable prefetch of data during merge sort while
    * reading data from sort temp files
    */
+  @CarbonProperty
   public static final String CARBON_MERGE_SORT_PREFETCH = "carbon.merge.sort.prefetch";
   public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
 
@@ -823,17 +872,27 @@ public final class CarbonCommonConstants {
   /**
    * this variable is to enable/disable identify high cardinality during first data loading
    */
+  @CarbonProperty
   public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE = "high.cardinality.identify.enable";
   public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT = "true";
 
   /**
    * threshold of high cardinality
    */
+  @CarbonProperty
   public static final String HIGH_CARDINALITY_THRESHOLD = "high.cardinality.threshold";
   public static final String HIGH_CARDINALITY_THRESHOLD_DEFAULT = "1000000";
   public static final int HIGH_CARDINALITY_THRESHOLD_MIN = 10000;
 
   /**
+   * percentage of cardinality in row count
+   */
+  @CarbonProperty
+  public static final String HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE =
+      "high.cardinality.row.count.percentage";
+  public static final String HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT = "80";
+
+  /**
    * 16 mb size
    */
   public static final long CARBON_16MB = 16 * 1024 * 1024;
@@ -871,6 +930,7 @@ public final class CarbonCommonConstants {
   /**
    * Number of unmerged segments to be merged.
    */
+  @CarbonProperty
   public static final String COMPACTION_SEGMENT_LEVEL_THRESHOLD =
       "carbon.compaction.level.threshold";
 
@@ -883,6 +943,7 @@ public final class CarbonCommonConstants {
    * Number of Update Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
+  @CarbonProperty
   public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
       "carbon.horizontal.update.compaction.threshold";
   /**
@@ -894,6 +955,7 @@ public final class CarbonCommonConstants {
    * Number of Delete Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
+  @CarbonProperty
   public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
       "carbon.horizontal.delete.compaction.threshold";
   /**
@@ -909,6 +971,7 @@ public final class CarbonCommonConstants {
   /**
    * hive connection url
    */
+  @CarbonProperty
   public static final String HIVE_CONNECTION_URL = "javax.jdo.option.ConnectionURL";
 
   /**
@@ -924,11 +987,13 @@ public final class CarbonCommonConstants {
   /**
    * hdfs temporary directory key
    */
+  @CarbonProperty
   public static final String HDFS_TEMP_LOCATION = "hadoop.tmp.dir";
 
   /**
    * zookeeper url key
    */
+  @CarbonProperty
   public static final String ZOOKEEPER_URL = "spark.deploy.zookeeper.url";
 
   /**
@@ -945,6 +1010,7 @@ public final class CarbonCommonConstants {
    * @Deprecated : This property has been deprecated.
    * Property for enabling system level compaction lock.1 compaction can run at once.
    */
+  @CarbonProperty
   public static String ENABLE_CONCURRENT_COMPACTION = "carbon.concurrent.compaction";
 
   /**
@@ -970,6 +1036,7 @@ public final class CarbonCommonConstants {
   /**
    * carbon data file version property
    */
+  @CarbonProperty
   public static final String CARBON_DATA_FILE_VERSION = "carbon.data.file.version";
 
   /**
@@ -1005,11 +1072,13 @@ public final class CarbonCommonConstants {
   /**
    * to determine to use the rdd persist or not.
    */
+  @CarbonProperty
   public static String isPersistEnabled = "carbon.update.persist.enable";
 
   /**
    * for enabling or disabling Horizontal Compaction.
    */
+  @CarbonProperty
   public static String isHorizontalCompactionEnabled = "carbon.horizontal.compaction.enable";
 
   /**
@@ -1039,6 +1108,7 @@ public final class CarbonCommonConstants {
   /**
    * Maximum waiting time (in seconds) for a query for requested executors to be started
    */
+  @CarbonProperty
   public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT =
       "carbon.max.executor.startup.timeout";
 
@@ -1072,6 +1142,7 @@ public final class CarbonCommonConstants {
   /**
    * to enable offheap sort
    */
+  @CarbonProperty
   public static final String ENABLE_UNSAFE_SORT = "enable.unsafe.sort";
 
   /**
@@ -1082,21 +1153,22 @@ public final class CarbonCommonConstants {
   /**
    * to enable offheap sort
    */
+  @CarbonProperty
   public static final String ENABLE_OFFHEAP_SORT = "enable.offheap.sort";
 
   /**
    * to enable offheap sort
    */
   public static final String ENABLE_OFFHEAP_SORT_DEFAULT = "true";
-
+  @CarbonProperty
   public static final String ENABLE_INMEMORY_MERGE_SORT = "enable.inmemory.merge.sort";
 
   public static final String ENABLE_INMEMORY_MERGE_SORT_DEFAULT = "false";
-
+  @CarbonProperty
   public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB = "offheap.sort.chunk.size.inmb";
 
   public static final String OFFHEAP_SORT_CHUNK_SIZE_IN_MB_DEFAULT = "64";
-
+  @CarbonProperty
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB = "sort.inmemory.size.inmb";
 
   public static final String IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT = "1024";
@@ -1104,7 +1176,10 @@ public final class CarbonCommonConstants {
   /**
    * Sorts the data in batches and writes the batch data to store with index file.
    */
+  @CarbonProperty
   public static final String LOAD_SORT_SCOPE = "carbon.load.sort.scope";
+  @CarbonProperty
+  public static final String LOAD_USE_BATCH_SORT = "carbon.load.use.batch.sort";
 
   /**
    * If set to BATCH_SORT, the sorting scope is smaller and more index tree will be created,
@@ -1120,8 +1195,10 @@ public final class CarbonCommonConstants {
    * Size of batch data to keep in memory, as a thumb rule it supposed
    * to be less than 45% of sort.inmemory.size.inmb otherwise it may spill intermediate data to disk
    */
+  @CarbonProperty
   public static final String LOAD_BATCH_SORT_SIZE_INMB = "carbon.load.batch.sort.size.inmb";
-
+  public static final String LOAD_BATCH_SORT_SIZE_INMB_DEFAULT = "0";
+  @CarbonProperty
   /**
    * The Number of partitions to use when shuffling data for sort. If user don't configurate or
    * configurate it less than 1, it uses the number of map tasks as reduce tasks. In general, we
@@ -1130,7 +1207,7 @@ public final class CarbonCommonConstants {
   public static final String LOAD_GLOBAL_SORT_PARTITIONS = "carbon.load.global.sort.partitions";
 
   public static final String LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT = "0";
-
+  @CarbonProperty
   public static final String ENABLE_VECTOR_READER = "carbon.enable.vector.reader";
 
   public static final String ENABLE_VECTOR_READER_DEFAULT = "true";
@@ -1138,6 +1215,7 @@ public final class CarbonCommonConstants {
   /*
    * carbon dictionary server port
    */
+  @CarbonProperty
   public static final String DICTIONARY_SERVER_PORT = "carbon.dictionary.server.port";
 
   /**
@@ -1148,6 +1226,7 @@ public final class CarbonCommonConstants {
   /**
    * property to set is IS_DRIVER_INSTANCE
    */
+  @CarbonProperty
   public static final String IS_DRIVER_INSTANCE = "is.driver.instance";
 
   /**
@@ -1158,6 +1237,7 @@ public final class CarbonCommonConstants {
   /**
    * property for enabling unsafe based query processing
    */
+  @CarbonProperty
   public static final String ENABLE_UNSAFE_IN_QUERY_EXECUTION = "enable.unsafe.in.query.processing";
 
   /**
@@ -1168,6 +1248,7 @@ public final class CarbonCommonConstants {
   /**
    * property for offheap based processing
    */
+  @CarbonProperty
   public static final String USE_OFFHEAP_IN_QUERY_PROCSSING = "use.offheap.in.query.processing";
 
   /**
@@ -1178,6 +1259,7 @@ public final class CarbonCommonConstants {
   /**
    * whether to prefetch data while loading.
    */
+  @CarbonProperty
   public static final String USE_PREFETCH_WHILE_LOADING = "carbon.loading.prefetch";
 
   /**
@@ -1190,17 +1272,17 @@ public final class CarbonCommonConstants {
   public static final String MAJOR = "major";
 
   public static final String LOCAL_FILE_PREFIX = "file://";
-
+  @CarbonProperty
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION = "carbon.custom.block.distribution";
   public static final String CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT = "false";
 
   public static final int DICTIONARY_DEFAULT_CARDINALITY = 1;
-
+  @CarbonProperty
   public static final String SPARK_SCHEMA_STRING_LENGTH_THRESHOLD =
       "spark.sql.sources.schemaStringLengthThreshold";
 
   public static final int SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT = 4000;
-
+  @CarbonProperty
   public static final String CARBON_BAD_RECORDS_ACTION = "carbon.bad.records.action";
 
   public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
new file mode 100644
index 0000000..ed481bb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonLoadOptionConstants.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.constants;
+
+import org.apache.carbondata.core.util.CarbonProperty;
+
+/**
+ * Load options constant
+ */
+public final class CarbonLoadOptionConstants {
+  /**
+   * option to enable and disable the logger
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE =
+      "carbon.options.bad.records.logger.enable";
+
+  public static String CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT = "false";
+  /**
+   * property to pass the bad records action
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BAD_RECORDS_ACTION =
+      "carbon.options.bad.records.action";
+  /**
+   * load option to specify weather empty data to be treated as bad record
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD =
+      "carbon.options.is.empty.data.bad.record";
+  public static final String CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT = "false";
+
+  /**
+   * option to specify the load option
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_DATEFORMAT =
+      "carbon.options.dateformat";
+  public static final String CARBON_OPTIONS_DATEFORMAT_DEFAULT = "";
+  /**
+   * option to specify the sort_scope
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_SORT_SCOPE =
+      "carbon.options.sort.scope";
+  /**
+   * option to specify the batch sort size inmb
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BATCH_SORT_SIZE_INMB =
+      "carbon.options.batch.sort.size.inmb";
+  /**
+   * Option to enable/ disable single_pass
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_SINGLE_PASS =
+      "carbon.options.single.pass";
+  public static final String CARBON_OPTIONS_SINGLE_PASS_DEFAULT = "false";
+
+  /**
+   * specify bad record path option
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_BAD_RECORD_PATH =
+      "carbon.options.bad.record.path";
+  /**
+   * specify bad record path option
+   */
+  @CarbonProperty
+  public static final String CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS =
+      "carbon.options.global.sort.partitions";
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
index 0ce73f0..edc7b9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
@@ -16,6 +16,8 @@
  */
 package org.apache.carbondata.core.constants;
 
+import org.apache.carbondata.core.util.CarbonProperty;
+
 /**
  * Constants for V3 data format
  */
@@ -24,6 +26,7 @@ public interface CarbonV3DataFormatConstants {
   /**
    * each blocklet group size in mb
    */
+  @CarbonProperty
   String BLOCKLET_SIZE_IN_MB = "carbon.blockletgroup.size.in.mb";
 
   /**
@@ -39,6 +42,7 @@ public interface CarbonV3DataFormatConstants {
   /**
    * number of column to be read in one IO in query
    */
+  @CarbonProperty
   String NUMBER_OF_COLUMN_TO_READ_IN_IO = "number.of.column.to.read.in.io";
 
   /**
@@ -59,6 +63,7 @@ public interface CarbonV3DataFormatConstants {
   /**
    * number of rows per blocklet column page
    */
+  @CarbonProperty
   String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE = "number.of.rows.per.blocklet.column.page";
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java b/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java
new file mode 100644
index 0000000..bef9576
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/exception/InvalidConfigurationException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.exception;
+
+import java.util.Locale;
+
+public class InvalidConfigurationException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public InvalidConfigurationException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public InvalidConfigurationException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public InvalidConfigurationException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 0142e38..c1e70ff 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -21,13 +21,15 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.lang.reflect.Field;
+import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
@@ -48,10 +50,7 @@ public final class CarbonProperties {
    */
   private Properties carbonProperties;
 
-  /**
-   * Added properties on the fly.
-   */
-  private Map<String, String> setProperties = new HashMap<>();
+  private Set<String> propertySet = new HashSet<String>();
 
   /**
    * Private constructor this will call load properties method to load all the
@@ -77,6 +76,11 @@ public final class CarbonProperties {
    * values in case of wrong values.
    */
   private void validateAndLoadDefaultProperties() {
+    try {
+      initPropertySet();
+    } catch (IllegalAccessException e) {
+      LOGGER.error("Illelagal access to declared field" + e.getMessage());
+    }
     if (null == carbonProperties.getProperty(CarbonCommonConstants.STORE_LOCATION)) {
       carbonProperties.setProperty(CarbonCommonConstants.STORE_LOCATION,
           CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL);
@@ -86,7 +90,6 @@ public final class CarbonProperties {
     validateNumCores();
     validateNumCoresBlockSort();
     validateSortSize();
-    validateBadRecordsLocation();
     validateHighCardinalityIdentify();
     validateHighCardinalityThreshold();
     validateCarbonDataFileVersion();
@@ -97,6 +100,27 @@ public final class CarbonProperties {
     validateNumberOfRowsPerBlockletColumnPage();
   }
 
+  private void initPropertySet() throws IllegalAccessException {
+    Field[] declaredFields = CarbonCommonConstants.class.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (field.isAnnotationPresent(CarbonProperty.class)) {
+        propertySet.add(field.get(field.getName()).toString());
+      }
+    }
+    declaredFields = CarbonV3DataFormatConstants.class.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (field.isAnnotationPresent(CarbonProperty.class)) {
+        propertySet.add(field.get(field.getName()).toString());
+      }
+    }
+    declaredFields = CarbonLoadOptionConstants.class.getDeclaredFields();
+    for (Field field : declaredFields) {
+      if (field.isAnnotationPresent(CarbonProperty.class)) {
+        propertySet.add(field.get(field.getName()).toString());
+      }
+    }
+  }
+
   private void validatePrefetchBufferSize() {
     String prefetchBufferSizeStr =
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE);
@@ -202,15 +226,6 @@ public final class CarbonProperties {
     }
   }
 
-  private void validateBadRecordsLocation() {
-    String badRecordsLocation =
-        carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
-    if (null == badRecordsLocation || badRecordsLocation.length() == 0) {
-      carbonProperties.setProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL);
-    }
-  }
-
   /**
    * This method validates the blocklet size
    */
@@ -425,6 +440,12 @@ public final class CarbonProperties {
    * @return properties value
    */
   public String getProperty(String key) {
+    // get the property value from session parameters,
+    // if its null then get value from carbonProperties
+    String sessionPropertyValue = getSessionPropertyValue(key);
+    if (null != sessionPropertyValue) {
+      return sessionPropertyValue;
+    }
     //TODO temporary fix
     if ("carbon.leaf.node.size".equals(key)) {
       return "120000";
@@ -433,6 +454,25 @@ public final class CarbonProperties {
   }
 
   /**
+   * returns session property value
+   *
+   * @param key
+   * @return
+   */
+  private String getSessionPropertyValue(String key) {
+    String value = null;
+    CarbonSessionInfo carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo();
+    if (null != carbonSessionInfo) {
+      SessionParams sessionParams =
+          ThreadLocalSessionInfo.getCarbonSessionInfo().getSessionParams();
+      if (null != sessionParams) {
+        value = sessionParams.getProperty(key);
+      }
+    }
+    return value;
+  }
+
+  /**
    * This method will be used to get the properties value if property is not
    * present then it will return tghe default value
    *
@@ -454,26 +494,10 @@ public final class CarbonProperties {
    * @return properties value
    */
   public CarbonProperties addProperty(String key, String value) {
-    setProperties.put(key, value);
     carbonProperties.setProperty(key, value);
     return this;
   }
 
-  /**
-   * Get all the added properties.
-   * @return
-   */
-  public Map<String, String> getAddedProperies() {
-    return setProperties;
-  }
-
-  public void setProperties(Map<String, String> newProperties) {
-    setProperties.putAll(newProperties);
-    for (Map.Entry<String, String> entry : newProperties.entrySet()) {
-      carbonProperties.setProperty(entry.getKey(), entry.getValue());
-    }
-  }
-
   private ColumnarFormatVersion getDefaultFormatVersion() {
     return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
   }
@@ -748,4 +772,13 @@ public final class CarbonProperties {
     }
     return numberOfDeltaFilesThreshold;
   }
+
+  /**
+   * returns true if carbon property
+   * @param key
+   * @return
+   */
+  public boolean isCarbonProperty(String key) {
+    return propertySet.contains(key);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java
new file mode 100644
index 0000000..2970a89
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperty.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * CarbonProperty Anotation
+ */
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CarbonProperty {
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
new file mode 100644
index 0000000..1a82f1d
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonSessionInfo.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.Serializable;
+
+/**
+ * This class maintains carbon session information details
+ */
+public class CarbonSessionInfo implements Serializable {
+
+  // contains carbon session param details
+  private SessionParams sessionParams;
+
+  public SessionParams getSessionParams() {
+    return sessionParams;
+  }
+
+  public void setSessionParams(SessionParams sessionParams) {
+    this.sessionParams = sessionParams;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 200d5ca..f409551 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -326,10 +327,13 @@ public final class CarbonUtil {
   }
 
   public static String getBadLogPath(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = CarbonProperties.getInstance()
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
-
     return badLogStoreLocation;
   }
 
@@ -1647,5 +1651,69 @@ public final class CarbonUtil {
         throw new IllegalArgumentException("Int cannot me more than 4 bytes");
     }
   }
+  /**
+   * Validate boolean value configuration
+   *
+   * @param value
+   * @return
+   */
+  public static boolean validateBoolean(String value) {
+    if (null == value) {
+      return false;
+    } else if (!("false".equalsIgnoreCase(value) || "true".equalsIgnoreCase(value))) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * validate the sort scope
+   * @param sortScope
+   * @return
+   */
+  public static boolean isValidSortOption(String sortScope) {
+    if (sortScope == null) {
+      return false;
+    }
+    switch (sortScope.toUpperCase()) {
+      case "BATCH_SORT":
+        return true;
+      case "LOCAL_SORT":
+        return true;
+      case "NO_SORT":
+        return true;
+      case "GLOBAL_SORT":
+        return true;
+      default:
+        return false;
+    }
+  }
+
+  /**
+   * validate teh batch size
+   *
+   * @param value
+   * @return
+   */
+  public static boolean validateValidIntType(String value) {
+    if (null == value) {
+      return false;
+    }
+    try {
+      Integer.parseInt(value);
+    } catch (NumberFormatException nfe) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * is valid store path
+   * @param badRecordsLocation
+   * @return
+   */
+  public static boolean isValidBadStorePath(String badRecordsLocation) {
+    return !(null == badRecordsLocation || badRecordsLocation.length() == 0);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index 781b898..f06ba01 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -1,26 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.carbondata.core.util;
 
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
+import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.*;
+
 /**
- * Created by root1 on 19/5/17.
+ * This class maintains carbon session params
  */
 public class SessionParams implements Serializable {
 
-  protected transient CarbonProperties properties;
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CacheProvider.class.getName());
 
   private Map<String, String> sProps;
 
   public SessionParams() {
     sProps = new HashMap<>();
-    properties = CarbonProperties.getInstance();
-  }
-
-  public SessionParams(SessionParams sessionParams) {
-    this();
-    sProps.putAll(sessionParams.sProps);
   }
 
   /**
@@ -30,41 +52,90 @@ public class SessionParams implements Serializable {
    * @return properties value
    */
   public String getProperty(String key) {
-    String s = sProps.get(key);
-    if (key == null) {
-      s = properties.getProperty(key);
-    }
-    return s;
+    return sProps.get(key);
   }
 
   /**
-   * This method will be used to get the properties value if property is not
-   * present then it will return tghe default value
+   * This method will be used to add a new property
    *
    * @param key
    * @return properties value
    */
-  public String getProperty(String key, String defaultValue) {
-    String value = sProps.get(key);
-    if (key == null) {
-      value = properties.getProperty(key, defaultValue);
+  public SessionParams addProperty(String key, String value) throws InvalidConfigurationException {
+    boolean isValidConf = validateKeyValue(key, value);
+    if (isValidConf) {
+      LOGGER.audit("The key " + key + " with value " + value + " added in the session param");
+      sProps.put(key, value);
     }
-    return value;
+    return this;
   }
 
   /**
-   * This method will be used to add a new property
-   *
+   * validate the key value to be set using set command
    * @param key
-   * @return properties value
+   * @param value
+   * @return
+   * @throws InvalidConfigurationException
    */
-  public SessionParams addProperty(String key, String value) {
-    sProps.put(key, value);
-    return this;
+  private boolean validateKeyValue(String key, String value) throws InvalidConfigurationException {
+    boolean isValid = false;
+    switch (key) {
+      case ENABLE_UNSAFE_SORT:
+      case CARBON_CUSTOM_BLOCK_DISTRIBUTION:
+      case CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE:
+      case CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD:
+      case CARBON_OPTIONS_SINGLE_PASS:
+        isValid = CarbonUtil.validateBoolean(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
+        }
+        break;
+      case CARBON_OPTIONS_BAD_RECORDS_ACTION:
+        try {
+          LoggerAction.valueOf(value.toUpperCase());
+          isValid = true;
+        } catch (IllegalArgumentException iae) {
+          throw new InvalidConfigurationException(
+              "The key " + key + " can have only either FORCE or IGNORE or REDIRECT.");
+        }
+        break;
+      case CARBON_OPTIONS_SORT_SCOPE:
+        isValid = CarbonUtil.isValidSortOption(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException("The sort scope " + key
+              + " can have only either BATCH_SORT or LOCAL_SORT or NO_SORT.");
+        }
+        break;
+      case CARBON_OPTIONS_BATCH_SORT_SIZE_INMB:
+      case CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS:
+        isValid = CarbonUtil.validateValidIntType(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException(
+              "The configured value for key " + key + " must be valid integer.");
+        }
+        break;
+      case CARBON_OPTIONS_BAD_RECORD_PATH:
+        isValid = CarbonUtil.isValidBadStorePath(value);
+        if (!isValid) {
+          throw new InvalidConfigurationException("Invalid bad records location.");
+        }
+        break;
+      // no validation needed while set for CARBON_OPTIONS_DATEFORMAT
+      case CARBON_OPTIONS_DATEFORMAT:
+        isValid = true;
+        break;
+      default:
+        throw new InvalidConfigurationException(
+            "The key " + key + " not supported for dynamic configuration.");
+    }
+    return isValid;
   }
 
-  public void setProperties(Map<String, String> newProperties) {
-    sProps.putAll(newProperties);
+  /**
+   * clear the set properties
+   */
+  public void clear() {
+    sProps.clear();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
new file mode 100644
index 0000000..df525bc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+/**
+ * This class maintains ThreadLocal session params
+ */
+public class ThreadLocalSessionInfo {
+  static final InheritableThreadLocal<CarbonSessionInfo> threadLocal =
+      new InheritableThreadLocal<CarbonSessionInfo>();
+
+  public static void setCarbonSessionInfo(CarbonSessionInfo carbonSessionInfo) {
+    threadLocal.set(carbonSessionInfo);
+  }
+
+  public static CarbonSessionInfo getCarbonSessionInfo() {
+    return threadLocal.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
deleted file mode 100644
index 354a0ee..0000000
--- a/core/src/main/java/org/apache/carbondata/core/util/ThreadLocalSessionParams.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.util;
-
-/**
- * This class maintains ThreadLocal session params
- */
-public class ThreadLocalSessionParams {
-  static final InheritableThreadLocal<SessionParams> threadLocal =
-      new InheritableThreadLocal<SessionParams>();
-
-  public static void setSessionParams(SessionParams sessionParams) {
-    threadLocal.set(sessionParams);
-  }
-
-  public static SessionParams getSessionParams() {
-    return threadLocal.get();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
index 6e6f2bd..9aa1188 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java
@@ -23,8 +23,10 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.CarbonProjection;
@@ -51,7 +53,10 @@ public class CarbonInputMapperTest extends TestCase {
 
   // changed setUp to static init block to avoid un wanted multiple time store creation
   static {
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
+
   }
 
   @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
index 60fee95..bf347c5 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/InputFilesTest.java
@@ -23,6 +23,9 @@ import java.util.List;
 import java.util.UUID;
 
 import junit.framework.TestCase;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.CarbonInputFormat;
 import org.apache.carbondata.hadoop.test.util.StoreCreator;
 import org.apache.hadoop.conf.Configuration;
@@ -37,6 +40,8 @@ import org.junit.Test;
 public class InputFilesTest extends TestCase {
   @Before
   public void setUp() throws Exception {
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
     // waiting 3s to finish table create and data loading
     Thread.sleep(3000L);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
deleted file mode 100644
index 28e2dbf..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/commands/SetCommandTestCase.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.testsuite.commands
-
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.core.util.CarbonProperties
-
-class SetCommandTestCase  extends QueryTest with BeforeAndAfterAll {
-
-  test("test set command") {
-
-    sql("set key1=value1")
-
-    assert(CarbonProperties.getInstance().getProperty("key1").equals("value1"), "Set command does not work" )
-    assert(sqlContext.getConf("key1").equals("value1"), "Set command does not work" )
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 2842a16..3f5be84 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -338,9 +338,9 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
       .addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
         CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
 
-    sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = ${CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT}")
-    sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = " +
-      s"${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT}")
+    // sql(s"SET ${CarbonCommonConstants.LOAD_SORT_SCOPE} = ${CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT}")
+    // sql(s"SET ${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS} = " +
+    //  s"${CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT}")
   }
 
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index 6fb11b3..4ccd49e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -71,12 +71,12 @@ class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterA
     try {
       sql(s"""
            LOAD DATA LOCAL INPATH '$resourcesPath/timeStampFormatData1.csv' into table t3
-           OPTIONS('dateformat' = '')
+           OPTIONS('dateformat' = 'date')
            """)
       assert(false)
     } catch {
       case ex: MalformedCarbonCommandException =>
-        assertResult(ex.getMessage)("Error: Option DateFormat is set an empty string.")
+        assertResult(ex.getMessage)("Error: Option DateFormat is not provided for Column date.")
       case _: Throwable=> assert(false)
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
index ae951bd..f2a4a7d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
@@ -28,11 +28,8 @@ import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 object ValidateUtil {
   def validateDateFormat(dateFormat: String, table: CarbonTable, tableName: String): Unit = {
     val dimensions = table.getDimensionByTableName(tableName).asScala
-    if (dateFormat != null) {
-      if (dateFormat.trim == "") {
-        throw new MalformedCarbonCommandException("Error: Option DateFormat is set an empty " +
-          "string.")
-      } else {
+    // allowing empty value to be configured for dateformat option.
+    if (dateFormat != null && dateFormat.trim != "") {
         val dateFormats: Array[String] = dateFormat.split(CarbonCommonConstants.COMMA)
         for (singleDateFormat <- dateFormats) {
           val dateFormatSplits: Array[String] = singleDateFormat.split(":", 2)
@@ -49,7 +46,6 @@ object ValidateUtil {
           }
         }
       }
-    }
   }
 
   def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index e00dd0f..106a9fd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
 
-import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 
 /**
  * This RDD maintains session level ThreadLocal
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
 abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
 
-  val sessionParams: SessionParams = ThreadLocalSessionParams.getSessionParams
+  val carbonSessionInfo: CarbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient oneParent: RDD[_]) =
@@ -40,7 +40,7 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
   def internalCompute(split: Partition, context: TaskContext): Iterator[T]
 
   final def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     internalCompute(split, context)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 5e37f63..383d308 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataType
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
@@ -827,7 +827,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     val supportedOptions = Seq("DELIMITER", "QUOTECHAR", "FILEHEADER", "ESCAPECHAR", "MULTILINE",
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
-      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT",
+      "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "DATEFORMAT", "BAD_RECORD_PATH",
       "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD", "SORT_SCOPE", "BATCH_SORT_SIZE_INMB",
       "GLOBAL_SORT_PARTITIONS"
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index a01ccb2..b76bca3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -54,6 +54,7 @@ object TestQueryExecutor {
   CarbonProperties.getInstance()
     .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
     .addProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+    .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords")
   private def lookupQueryExecutor: Class[_] = {
     ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)
       .iterator().next().getClass

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 65235e6..3579b8a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel,
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
@@ -53,7 +54,6 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index d085ad7..ba22c3c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -36,6 +36,7 @@ import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -412,7 +413,16 @@ case class LoadTable(
       val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
       val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
-
+      val bad_record_path = options.getOrElse("bad_record_path",
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+      if (badRecordsLoggerEnable.toBoolean ||
+          LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
+        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+          sys.error("Invalid bad records location.")
+        }
+      }
+      carbonLoadModel.setBadRecordsLocation(bad_record_path)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentchar, "#"))
@@ -730,13 +740,6 @@ private[sql] case class DropTableCommand(ifExistsSet: Boolean, databaseNameOp: O
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
         }
-        // delete bad record log after drop table
-        val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
-        val badLogFileType = FileFactory.getFileType(badLogPath)
-        if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
-          val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
-          CarbonUtil.deleteFoldersAndFiles(file)
-        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 48af516..5c20808 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel,
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
@@ -53,7 +54,6 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.LoggerAction
 import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 7c096d3..d28044f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
-import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.processing.merger.TableMeta
@@ -53,8 +53,8 @@ case class CarbonDatasourceHadoopRelation(
       absIdentifier.getCarbonTableIdentifier.getTableName)(sparkSession)
     .asInstanceOf[CarbonRelation]
 
-  val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
-  ThreadLocalSessionParams.setSessionParams(sessionParams)
+  val carbonSessionInfo : CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+  ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
   override def sqlContext: SQLContext = sparkSession.sqlContext
 
   override def schema: StructType = tableSchema.getOrElse(carbonRelation.schema)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 78820ea..925b82b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.internal.CarbonSQLConf
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
@@ -38,6 +38,8 @@ class CarbonEnv {
 
   var sessionParams: SessionParams = _
 
+  var carbonSessionInfo: CarbonSessionInfo = _
+
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   // set readsupport class global so that the executor can get it.
@@ -48,8 +50,10 @@ class CarbonEnv {
   def init(sparkSession: SparkSession): Unit = {
     sparkSession.udf.register("getTupleId", () => "")
     if (!initialized) {
+      carbonSessionInfo = new CarbonSessionInfo()
       sessionParams = new SessionParams()
-      ThreadLocalSessionParams.setSessionParams(sessionParams)
+      carbonSessionInfo.setSessionParams(sessionParams)
+      ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
       val config = new CarbonSQLConf(sparkSession)
       if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
         config.addDefaultCarbonParams()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
index a4feead..d2022be 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonHiveCommands.scala
@@ -49,7 +49,7 @@ case class CarbonSetCommand(command: SetCommand)
   override val output = command.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val sessionParms = CarbonEnv.getInstance(sparkSession).sessionParams
+    val sessionParms = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
     command.kv match {
       case Some((key, Some(value))) =>
         val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
@@ -68,7 +68,7 @@ case class CarbonResetCommand()
   override val output = ResetCommand.output
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    CarbonEnv.getInstance(sparkSession).sessionParams.clear()
+    CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams.clear()
     ResetCommand.run(sparkSession)
   }
 }