You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/06/27 10:52:59 UTC

[3/5] carbondata git commit: 1. Refactored the bad record code, by default the bad record path will be empty, if bad record logger is enabled or action is redirect and bad record path is not configured then data-load will fail. 2. Support dynamic set

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 0064c21..f9f556d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -35,9 +35,10 @@ import org.apache.spark.util.FileUtils
 import org.codehaus.jackson.map.ObjectMapper
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -359,7 +360,8 @@ case class LoadTable(
       sys.error(s"Data loading failed. table not found: $dbName.$tableName")
     }
 
-    CarbonProperties.getInstance().addProperty("zookeeper.enable.lock", "false")
+    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
+    carbonProperty.addProperty("zookeeper.enable.lock", "false")
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
         .getCarbonTableIdentifier,
@@ -407,31 +409,60 @@ case class LoadTable(
       val commentChar = options.getOrElse("commentchar", "#")
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
-      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordActionValue = CarbonProperties.getInstance()
+      val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable",
+        carbonProperty
+          .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+            CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT))
+      val badRecordActionValue = carbonProperty
         .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-      val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
-      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
+      val badRecordsAction = options.getOrElse("bad_records_action", carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+          badRecordActionValue))
+      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", carbonProperty
+        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT))
       val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
-      val dateFormat = options.getOrElse("dateformat", null)
+      val dateFormat = options.getOrElse("dateformat",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT))
       ValidateUtil.validateDateFormat(dateFormat, table, tableName)
       val maxColumns = options.getOrElse("maxcolumns", null)
-      val sortScope = options.getOrElse("sort_scope", null)
+      val sortScope = options
+        .getOrElse("sort_scope",
+          carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+            carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+              CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))
       ValidateUtil.validateSortScope(table, sortScope)
-      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb", null)
-      val globalSortPartitions = options.getOrElse("global_sort_partitions", null)
+      val batchSortSizeInMB = options.getOrElse("batch_sort_size_inmb",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+          carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)))
+      val bad_record_path = options.getOrElse("bad_record_path",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+          carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)))
+      if (badRecordsLoggerEnable.toBoolean ||
+          LoggerAction.REDIRECT.name().equalsIgnoreCase(badRecordsAction)) {
+        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+          sys.error("Invalid bad records location.")
+        }
+      }
+      carbonLoadModel.setBadRecordsLocation(bad_record_path)
+      val globalSortPartitions = options.getOrElse("global_sort_partitions",
+        carbonProperty
+          .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null))
       ValidateUtil.validateGlobalSortPartitions(globalSortPartitions)
       carbonLoadModel.setEscapeChar(checkDefaultValue(escapeChar, "\\"))
       carbonLoadModel.setQuoteChar(checkDefaultValue(quoteChar, "\""))
       carbonLoadModel.setCommentChar(checkDefaultValue(commentChar, "#"))
       carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
+      carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
         CarbonCommonConstants.CARBON_DATE_FORMAT,
         CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
       carbonLoadModel
@@ -449,7 +480,9 @@ case class LoadTable(
       carbonLoadModel.setSortScope(sortScope)
       carbonLoadModel.setBatchSortSizeInMb(batchSortSizeInMB)
       carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
-      val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
+      val useOnePass = options.getOrElse("single_pass",
+        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
         case "true" =>
           true
         case "false" =>
@@ -534,7 +567,7 @@ case class LoadTable(
                 allDictionaryPath)
           }
           // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = CarbonProperties.getInstance()
+          val dictionaryServerPort = carbonProperty
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.
@@ -776,13 +809,6 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
             CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
           }
         }
-        // delete bad record log after drop table
-        val badLogPath = CarbonUtil.getBadLogPath(dbName + File.separator + tableName)
-        val badLogFileType = FileFactory.getFileType(badLogPath)
-        if (FileFactory.isFileExist(badLogPath, badLogFileType)) {
-          val file = FileFactory.getCarbonFile(badLogPath, badLogFileType)
-          CarbonUtil.deleteFoldersAndFiles(file)
-        }
       }
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
new file mode 100644
index 0000000..51b29a1
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/internal/CarbonSqlConf.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.internal
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * To initialize dynamic values default param
+ */
+class CarbonSQLConf(sparkSession: SparkSession) {
+
+  val carbonProperties = CarbonProperties.getInstance()
+
+  /**
+   * To initialize dynamic param defaults along with usage docs
+   */
+  def addDefaultCarbonParams(): Unit = {
+    val ENABLE_UNSAFE_SORT =
+      SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT)
+        .doc("To enable/ disable unsafe sort.")
+        .booleanConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+          CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    val CARBON_CUSTOM_BLOCK_DISTRIBUTION =
+      SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION)
+        .doc("To enable/ disable carbon custom block distribution.")
+        .booleanConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+            CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    val BAD_RECORDS_LOGGER_ENABLE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE)
+        .doc("To enable/ disable carbon bad record logger.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    val BAD_RECORDS_ACTION =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
+        .doc("To configure the bad records action.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+            CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    val IS_EMPTY_DATA_BAD_RECORD =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
+        .doc("Property to decide weather empty data to be considered bad/ good record.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT
+          .toBoolean)
+    val SORT_SCOPE =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE)
+        .doc("Property to specify sort scope.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    val BATCH_SORT_SIZE_INMB =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB)
+        .doc("Property to specify batch sort size in MB.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    val SINGLE_PASS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS)
+        .doc("Property to enable/disable single_pass.")
+        .booleanConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    val BAD_RECORD_PATH =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH)
+        .doc("Property to configure the bad record location.")
+        .stringConf
+        .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    val GLOBAL_SORT_PARTITIONS =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS)
+        .doc("Property to configure the global sort partitions.")
+        .stringConf
+        .createWithDefault(carbonProperties
+          .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+            CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    val DATEFORMAT =
+      SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
+        .doc("Property to configure data format for date type columns.")
+        .stringConf
+        .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+
+  /**
+   * to set the dynamic properties default values
+   */
+  def addDefaultCarbonSessionParams(): Unit = {
+    sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+        CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+      carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION,
+          CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION_DEFAULT).toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+        CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean)
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+      carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+        CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+      carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT))
+    sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 3412fb0..41d6bd3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.SparkSqlAstBuilder
 import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, PartitionerField, TableModel}
 import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution}
 
-import org.apache.carbondata.core.util.{SessionParams, ThreadLocalSessionParams}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
@@ -43,8 +43,8 @@ class CarbonSparkSqlParser(conf: SQLConf, sparkSession: SparkSession) extends Ab
   private val substitutor = new VariableSubstitution(conf)
 
   override def parsePlan(sqlText: String): LogicalPlan = {
-    val sessionParams : SessionParams = CarbonEnv.getInstance(sparkSession).sessionParams
-    ThreadLocalSessionParams.setSessionParams(sessionParams)
+    val carbonSessionInfo: CarbonSessionInfo = CarbonEnv.getInstance(sparkSession).carbonSessionInfo
+    ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
     try {
       super.parsePlan(sqlText)
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
new file mode 100644
index 0000000..846c4b6
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/BadRecordPathLoadOptionTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordPathLoadOptionTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+  var badRecordPath: String = null
+  override def beforeAll {
+    try {
+       badRecordPath = new File("./target/test/badRecords")
+        .getCanonicalPath.replaceAll("\\\\","/")
+      sql("drop table IF EXISTS salestest")
+    }
+  }
+
+  test("data load log file and csv file written at the configured location") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS salestest(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
+    sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH}=${badRecordPath}")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE salestest OPTIONS" +
+        "('bad_records_logger_enable'='true','bad_records_action'='redirect', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"')")
+    val location: Boolean = isFilesWrittenAtBadStoreLocation
+    assert(location)
+  }
+
+  override def afterAll {
+    sql("drop table salestest")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+  def isFilesWrittenAtBadStoreLocation: Boolean = {
+    val badStorePath = badRecordPath + "/default/salestest/0/0"
+    val carbonFile: CarbonFile = FileFactory
+      .getCarbonFile(badStorePath, FileFactory.getFileType(badStorePath))
+    var exists: Boolean = carbonFile.exists()
+    if (exists) {
+      val listFiles: Array[CarbonFile] = carbonFile.listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          if (file.getName.endsWith(".log") || file.getName.endsWith(".csv")) {
+            return true;
+          }
+          return false;
+        }
+      })
+      exists = listFiles.size > 0
+    }
+    return exists;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index 5e91574..6f57cd6 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -238,7 +238,6 @@ class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
           CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
     }
   }
-
   //
   override def afterAll {
     sql("drop table IF EXISTS data_pm")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
new file mode 100644
index 0000000..18b4039
--- /dev/null
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/commands/SetCommandTestCase.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.carbondata.commands
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants
+import org.apache.carbondata.core.exception.InvalidConfigurationException
+
+class SetCommandTestCase extends QueryTest with BeforeAndAfterAll{
+  override def beforeAll: Unit = {
+    sql("set carbon=true")
+  }
+  test("test set command") {
+    checkAnswer(sql("set"), sql("set"))
+  }
+
+  test("test set any value command") {
+    checkAnswer(sql("set carbon=false"), sql("set carbon"))
+  }
+
+  test("test set command for enable.unsafe.sort=true") {
+    checkAnswer(sql("set enable.unsafe.sort=true"), sql("set enable.unsafe.sort"))
+  }
+
+  test("test set command for enable.unsafe.sort for invalid option") {
+    try {
+      checkAnswer(sql("set enable.unsafe.sort=123"), sql("set enable.unsafe.sort"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  //is_empty_data_bad_record
+  test(s"test set command for" +
+       s" ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }=true") {
+    checkAnswer(sql(s"set ${
+      CarbonLoadOptionConstants
+        .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE
+    }=true"), sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }"))
+  }
+
+  test(s"test set command for ${
+    CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${
+        CarbonLoadOptionConstants
+          .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE
+      }=123"), sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE }"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  test(s"test set command for ${
+    CarbonLoadOptionConstants
+      .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD
+  }=true") {
+    checkAnswer(sql(s"set ${
+      CarbonLoadOptionConstants
+        .CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD
+    }=true"),
+      sql(s"set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD }"))
+  }
+
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD} " +
+       s"for invalid option") {
+    try {
+      checkAnswer(
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  //carbon.custom.block.distribution
+  test("test set command for carbon.custom.block.distribution=true") {
+    checkAnswer(sql("set carbon.custom.block.distribution=true"),
+      sql("set carbon.custom.block.distribution"))
+  }
+
+  test("test set command for carbon.custom.block.distribution for invalid option") {
+    try {
+      checkAnswer(sql("set carbon.custom.block.distribution=123"),
+        sql("set carbon.custom.block.distribution"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // sort_scope
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT") {
+    checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=LOCAL_SORT"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}"))
+  }
+
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // batch_sort_size_inmb
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4") {
+    checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=4"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}"))
+  }
+
+  test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}=hjf"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  // single_pass
+  test(s"test set command for ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true") {
+    checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=true"),
+      sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
+  }
+
+  test(s"test set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS} for invalid option") {
+    try {
+      checkAnswer(sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}=123"),
+        sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
+      assert(false)
+    } catch {
+      case ex: InvalidConfigurationException =>
+        assert(true)
+    }
+  }
+  override def afterAll {
+    sql("reset")
+    sql("set carbon=true")
+    checkAnswer(sql("set carbon"),
+      sql("set"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java b/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
deleted file mode 100644
index 901df3b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/LoggerAction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-/**
- * enum to hold the bad record logger action
- */
-public enum LoggerAction {
-
-  FORCE("FORCE"), // data will be converted to null
-  REDIRECT("REDIRECT"), // no null conversion moved to bad record and written to raw csv
-  IGNORE("IGNORE"), // no null conversion moved to bad record and not written to raw csv
-  FAIL("FAIL");  //data loading will fail if a bad record is found
-  private String name;
-
-  LoggerAction(String name) {
-    this.name = name;
-  }
-
-  @Override public String toString() {
-    return this.name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 7ec7933..bfc1be9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -158,6 +158,10 @@ public class CarbonLoadModel implements Serializable {
    * Batch sort size in mb.
    */
   private String batchSortSizeInMb;
+  /**
+   * bad record location
+   */
+  private String badRecordsLocation;
 
   /**
    * Number of partitions in global sort.
@@ -363,6 +367,7 @@ public class CarbonLoadModel implements Serializable {
     copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copy.sortScope = sortScope;
     copy.batchSortSizeInMb = batchSortSizeInMb;
+    copy.badRecordsLocation = badRecordsLocation;
     return copy;
   }
 
@@ -464,6 +469,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
     copyObj.sortScope = sortScope;
     copyObj.batchSortSizeInMb = batchSortSizeInMb;
+    copyObj.badRecordsLocation = badRecordsLocation;
     return copyObj;
   }
 
@@ -764,4 +770,12 @@ public class CarbonLoadModel implements Serializable {
   public void setGlobalSortPartitions(String globalSortPartitions) {
     this.globalSortPartitions = globalSortPartitions;
   }
+
+  public String getBadRecordsLocation() {
+    return badRecordsLocation;
+  }
+
+  public void setBadRecordsLocation(String badRecordsLocation) {
+    this.badRecordsLocation = badRecordsLocation;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 3294d5f..5662a04 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -180,6 +181,8 @@ public final class DataLoadProcessBuilder {
         loadModel.getBatchSortSizeInMb());
     configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
         loadModel.getGlobalSortPartitions());
+    configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+        loadModel.getBadRecordsLocation());
     CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getFactTableName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
index 1cc043f..2bf8e16 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortScopeOptions.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.processing.newflow.sort;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Sort scope options
@@ -43,21 +44,7 @@ public class SortScopeOptions {
   }
 
   public static boolean isValidSortOption(String sortScope) {
-    if (sortScope == null) {
-      return false;
-    }
-    switch (sortScope.toUpperCase()) {
-      case "BATCH_SORT":
-        return true;
-      case "LOCAL_SORT":
-        return true;
-      case "GLOBAL_SORT":
-        return true;
-      case "NO_SORT":
-        return true;
-      default:
-        return false;
-    }
+    return CarbonUtil.isValidSortOption(sortScope);
   }
 
   public enum SortScope {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 000d0b9..62d6c94 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -24,11 +24,12 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -152,16 +153,22 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
-        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
-        identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
-            .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
-            + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+        identifier.getTableName() + '_' + System.currentTimeMillis(),
+        getBadLogStoreLocation(configuration,
+            identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
+                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
         badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
   }
 
-  public static String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
+      String storeLocation) {
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -198,7 +205,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     // rename the bad record in progress to normal
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
-    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
+    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
         identifier.getDatabaseName() + File.separator + identifier.getTableName()
             + File.separator + configuration.getSegmentId() + File.separator + configuration
             .getTaskNo());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index d6185ba..c6f83ed 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -24,13 +24,14 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -41,6 +42,7 @@ import org.apache.carbondata.processing.newflow.partition.Partitioner;
 import org.apache.carbondata.processing.newflow.partition.impl.HashPartitionerImpl;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 /**
  * Replace row data fields with dictionary values if column is configured dictionary encoded.
@@ -187,8 +189,12 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   }
 
   private String getBadLogStoreLocation(String storeLocation) {
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     return badLogStoreLocation;
@@ -200,6 +206,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
       super.close();
       if (null != badRecordLogger) {
         badRecordLogger.closeStreams();
+        renameBadRecord(configuration);
       }
       if (converters != null) {
         for (RowConverter converter : converters) {
@@ -208,7 +215,15 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
       }
     }
   }
-
+  private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
+    // rename the bad record in progress to normal
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+        identifier.getDatabaseName() + File.separator + identifier.getTableName()
+            + File.separator + configuration.getSegmentId() + File.separator + configuration
+            .getTaskNo());
+  }
   @Override protected String getStepName() {
     return "Data Converter with Bucketing";
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 84e1f20..62f13db 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -90,12 +91,18 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * @param configuration
    * @param storeLocation
    */
-  public static void renameBadRecordsFromInProgressToNormal(String storeLocation) {
+  public static void renameBadRecordsFromInProgressToNormal(
+      CarbonDataLoadConfiguration configuration, String storeLocation) {
     // get the base store location
-    String badLogStoreLocation =
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
     badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
 
     FileType fileType = FileFactory.getFileType(badLogStoreLocation);
@@ -466,7 +473,8 @@ public final class CarbonDataProcessorUtil {
       if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)
           == null) {
         batchSortSizeInMb = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, "0"));
+            .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+                CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT));
       } else {
         batchSortSizeInMb = Integer.parseInt(
             configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/39644b5e/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index d5a4f02..fdbd2f8 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -59,8 +59,9 @@ public class BlockIndexStoreTest extends TestCase {
 
   @BeforeClass public void setUp() {
 	property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
-	
 	CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+    CarbonProperties.getInstance().
+        addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords");
     StoreCreator.createCarbonStore();
     CarbonProperties.getInstance().
         addProperty(CarbonCommonConstants.CARBON_MAX_DRIVER_LRU_CACHE_SIZE, "10");