You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/17 07:48:47 UTC

[1/2] incubator-carbondata git commit: [CARBONDATA-784] Bad record making configurable empty data not a bad record.

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master b8f1d3ec6 -> d4f07ae93


[CARBONDATA-784] Bad record making configurable empty data not a bad record.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3251c894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3251c894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3251c894

Branch: refs/heads/master
Commit: 3251c8941c2ba088cfcf3bd9ca9e6a7563e6bf67
Parents: b8f1d3e
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Wed Mar 8 15:34:00 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 17 13:08:03 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 +-
 .../badrecordloger/BadRecordEmptyDataTest.scala | 167 +++++++++++++++++++
 .../badrecordloger/BadRecordLoggerTest.scala    |   6 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  11 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   9 +-
 .../execution/command/carbonTableSchema.scala   |  12 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   9 +-
 .../execution/command/carbonTableSchema.scala   |  13 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   1 +
 .../processing/csvload/CSVInputFormat.java      |   1 +
 .../csvreaderstep/UnivocityCsvParser.java       |   1 +
 .../processing/model/CarbonLoadModel.java       |  15 ++
 .../newflow/DataLoadProcessBuilder.java         |   2 +
 .../constants/DataLoadProcessorConstants.java   |   2 +
 .../newflow/converter/BadRecordLogHolder.java   |  10 ++
 .../DirectDictionaryFieldConverterImpl.java     |  15 +-
 .../converter/impl/FieldEncoderFactory.java     |  10 +-
 .../impl/MeasureFieldConverterImpl.java         |  15 +-
 .../impl/NonDictionaryFieldConverterImpl.java   |  16 +-
 .../converter/impl/RowConverterImpl.java        |  28 ++--
 .../steps/DataConverterProcessorStepImpl.java   |  14 +-
 ...ConverterProcessorWithBucketingStepImpl.java |   8 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   7 +
 .../store/CarbonFactDataHandlerModel.java       |  11 ++
 25 files changed, 347 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ff257dd..853d3b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -185,8 +185,7 @@ public final class CarbonCommonConstants {
   /**
    * CARBON_BADRECORDS_LOCATION_DEFAULT
    */
-  public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL =
-      "../unibi-solutions/system/carbon/badRecords";
+  public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL = "/tmp/carbon/badRecords";
   /**
    * HIERARCHY_FILE_EXTENSION
    */
@@ -1164,6 +1163,10 @@ public final class CarbonCommonConstants {
 
   public static final int SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT = 4000;
 
+  public static final String CARBON_BAD_RECORDS_ACTION = "carbon.bad.records.action";
+
+  public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
new file mode 100644
index 0000000..37dc26d
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.badrecordloger
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordEmptyDataTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll {
+    try {
+      sql("drop table IF EXISTS emptyColumnValues")
+      sql("drop table IF EXISTS emptyColumnValues_false")
+      sql("drop table IF EXISTS empty_timestamp")
+      sql("drop table IF EXISTS empty_timestamp_false")
+      sql("drop table IF EXISTS dataloadOptionTests")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+      var csvFilePath = ""
+
+      // 1. empty data for string data type - take empty value
+      // 2. empty data for non-string data type - Bad records/Null value based on configuration
+      //table should have only two records.
+      sql(
+        """CREATE TABLE IF NOT EXISTS emptyColumnValues(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+          +
+          "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='false'," +
+          " 'bad_records_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable to false
+      sql(
+        """CREATE TABLE IF NOT EXISTS emptyColumnValues_false(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+          + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+      // 4.1 Time stamp empty data - Bad records/Null value based on configuration
+      // 5. non-parsable data - Bad records/Null value based on configuration
+      // 6. empty line(check current one) - Bad records/Null value based on configuration
+      // only one value should be loadded.
+      sql(
+        """CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+          +
+          "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='false' ," +
+          "'bad_records_action'='ignore', " +
+          "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+      // load with bad_records_logger_enable to false
+      sql(
+        """CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+        """)
+      csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+          + "('bad_records_logger_enable'='false','IS_EMPTY_DATA_BAD_RECORD'='false'," +
+          " 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+    } catch {
+      case x: Throwable => CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    }
+  }
+
+   test("select count(*) from empty_timestamp") {
+    checkAnswer(
+      sql("select count(*) from empty_timestamp"),
+      Seq(Row(2)
+      )
+    )
+  }
+
+  test("select count(*) from emptyColumnValues") {
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("select count(*) from emptyColumnValues_false") {
+    checkAnswer(
+      sql("select count(*) from emptyColumnValues_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("select count(*) from empty_timestamp_false") {
+    checkAnswer(
+      sql("select count(*) from empty_timestamp_false"),
+      Seq(Row(7)
+      )
+    )
+  }
+
+  test("test load ddl command") {
+    sql(
+      """CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
+           String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+      """)
+    val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+    try {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
+          + "('IS_EMPTY_DATA_BAD_RECORD'='xyz', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+    } catch {
+      case ex: Exception =>
+        assert("option IS_EMPTY_DATA_BAD_RECORD can have option either true or false"
+          .equals(ex.getMessage))
+    }
+  }
+
+  override def afterAll {
+    sql("drop table emptyColumnValues")
+    sql("drop table emptyColumnValues_false")
+    sql("drop table empty_timestamp")
+    sql("drop table empty_timestamp_false")
+    sql("drop table dataloadOptionTests")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 125c819..5716978 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -113,7 +113,8 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
       csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
       sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
           +
-          "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+          "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true'," +
+          " 'bad_records_action'='ignore', " +
           "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
       // load with bad_records_logger_enable to false
       sql(
@@ -137,7 +138,8 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
       csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
       sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
           +
-          "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+          "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true' ," +
+          "'bad_records_action'='ignore', " +
           "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
       // load with bad_records_logger_enable to false
       sql(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 56f6e6d..9c5ce83 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -84,6 +84,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
   protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
   protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
+  protected val IS_EMPTY_DATA_BAD_RECORD = carbonKeyWord("IS_EMPTY_DATA_BAD_RECORD")
+  protected val IS_EMPTY_COMMA_DATA_BAD_RECORD = carbonKeyWord("IS_NULL_DATA_BAD_RECORD")
   protected val FILES = carbonKeyWord("FILES")
   protected val FROM = carbonKeyWord("FROM")
   protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
@@ -750,7 +752,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       "COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
       "SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
       "ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT",
-      "SINGLE_PASS"
+      "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD"
     )
     var isSupported = true
     val invalidOptions = StringBuilder.newBuilder
@@ -797,6 +799,13 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
             "option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT")
       }
     }
+    if (options.exists(_._1.equalsIgnoreCase("IS_EMPTY_DATA_BAD_RECORD"))) {
+      val optionValue: String = options.get("is_empty_data_bad_record").get.head._2
+      if (!("true".equalsIgnoreCase(optionValue) || "false".equalsIgnoreCase(optionValue))) {
+        throw new MalformedCarbonCommandException(
+          "option IS_EMPTY_DATA_BAD_RECORD can have option either true or false")
+      }
+    }
 
     // check for duplicate options
     val duplicateOptions = options filter {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index f284345..f8e2bc1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.util.Utils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 
 /**
  * the sql executor of spark-common-test
@@ -50,7 +52,8 @@ object TestQueryExecutor {
   val timestampFormat = "dd-MM-yyyy"
 
   val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
-
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
   private def lookupQueryExecutor: Class[_] = {
     ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)
       .iterator().next().getClass

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index fe92aad..306d277 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -973,8 +973,13 @@ object CarbonDataRDDFactory {
 
         shutDownDictionaryServer(carbonLoadModel, result)
 
-        LOGGER.audit("Data load is successful for " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
+          LOGGER.audit("Data load is partially successful for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        } else {
+          LOGGER.audit("Data load is successful for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        }
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 62463fe..6f14feb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
@@ -416,7 +417,11 @@ case class LoadTable(
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
       val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordsLoggerRedirect = options.getOrElse("bad_records_action", "force")
+      val badRecordActionValue = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+      val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
+      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
       val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
@@ -443,7 +448,10 @@ case class LoadTable(
           TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
       carbonLoadModel
         .setBadRecordsAction(
-          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
+          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
+      carbonLoadModel
+        .setIsEmptyDataBadRecord(
+          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
       // when single_pass=true, and use_kettle=false, and not use all dict
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 771e455..eb9e88b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -970,8 +970,13 @@ object CarbonDataRDDFactory {
         }
 
         shutdownDictionaryServer(carbonLoadModel, result)
-        LOGGER.audit("Data load is successful for " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
+          LOGGER.audit("Data load is partially successful for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        } else {
+          LOGGER.audit("Data load is successful for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        }
         try {
           // compaction handling
           handleSegmentMerging(tableCreationTime)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 2b75c56..002b6f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,6 +54,7 @@ import org.apache.carbondata.format.SchemaEvolutionEntry
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
@@ -418,7 +419,11 @@ case class LoadTable(
       val columnDict = options.getOrElse("columndict", null)
       val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
       val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
-      val badRecordsLoggerRedirect = options.getOrElse("bad_records_action", "force")
+      val badRecordActionValue = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+      val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
+      val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
       val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
       val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
       val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
@@ -444,8 +449,10 @@ case class LoadTable(
           TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
       carbonLoadModel
         .setBadRecordsAction(
-          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
-
+          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
+      carbonLoadModel
+        .setIsEmptyDataBadRecord(
+          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>
           if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index b8f715f..15d5597 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -43,6 +43,7 @@ object Spark2TestQueryExecutor {
     .addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
       System.getProperty("java.io.tmpdir"))
     .addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL)
+    .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
 
 
   import org.apache.spark.sql.CarbonSession._

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index 269a127..7545fe5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -215,6 +215,7 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
       parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
       parserSettings.setLineSeparatorDetectionEnabled(true);
       parserSettings.setNullValue("");
+      parserSettings.setEmptyValue("");
       parserSettings.setIgnoreLeadingWhitespaces(false);
       parserSettings.setIgnoreTrailingWhitespaces(false);
       parserSettings.setSkipEmptyLines(false);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index f33442b..f99fab4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -100,6 +100,7 @@ public class UnivocityCsvParser {
     parserSettings.setMaxColumns(
         getMaxColumnsForParsing(csvParserVo.getNumberOfColumns(), csvParserVo.getMaxColumns()));
     parserSettings.setNullValue("");
+    parserSettings.setEmptyValue("");
     parserSettings.setIgnoreLeadingWhitespaces(false);
     parserSettings.setIgnoreTrailingWhitespaces(false);
     parserSettings.setSkipEmptyLines(false);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index a628971..5f8c141 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -141,6 +141,11 @@ public class CarbonLoadModel implements Serializable {
 
   private String carbondataFileName = "";
 
+  /**
+   * defines the string to specify whether empty data is good or bad
+   */
+  private String isEmptyDataBadRecord;
+
   public String getCarbondataFileName() {
     return carbondataFileName;
   }
@@ -387,6 +392,7 @@ public class CarbonLoadModel implements Serializable {
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
     copy.preFetch = preFetch;
+    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
     return copy;
   }
 
@@ -437,6 +443,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
     copyObj.preFetch = preFetch;
+    copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
     return copyObj;
   }
 
@@ -760,4 +767,12 @@ public class CarbonLoadModel implements Serializable {
   public void setDefaultDateFormat(String defaultDateFormat) {
     this.defaultDateFormat = defaultDateFormat;
   }
+
+  public String getIsEmptyDataBadRecord() {
+    return isEmptyDataBadRecord;
+  }
+
+  public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
+    this.isEmptyDataBadRecord = isEmptyDataBadRecord;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 35b7989..f17779c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -130,6 +130,8 @@ public final class DataLoadProcessBuilder {
         loadModel.getBadRecordsLoggerEnable().split(",")[1]);
     configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
         loadModel.getBadRecordsAction().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD,
+        loadModel.getIsEmptyDataBadRecord().split(",")[1]);
     configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
         loadModel.getFactFilePath());
     if (CarbonMetadata.getInstance().getCarbonTable(carbonTable.getTableUniqueName()) == null) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
index 78a48b7..ea06cd0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
@@ -40,6 +40,8 @@ public final class DataLoadProcessorConstants {
 
   public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
 
+  public static final String IS_EMPTY_DATA_BAD_RECORD = "IS_EMPTY_DATA_BAD_RECORD";
+
   public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
index 69bf6e3..42a6fec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
@@ -25,6 +25,8 @@ public class BadRecordLogHolder {
 
   private boolean badRecordAdded;
 
+  private boolean isLogged;
+
   public String getReason() {
     return reason;
   }
@@ -41,4 +43,12 @@ public class BadRecordLogHolder {
   public void clear() {
     this.badRecordAdded = false;
   }
+
+  public boolean isLogged() {
+    return isLogged;
+  }
+
+  public void setLogged(boolean logged) {
+    isLogged = logged;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
index 63dfc53..05ffe8b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -35,8 +35,10 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
   private String nullFormat;
 
   private CarbonColumn column;
+  private boolean isEmptyBadRecord;
 
-  public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index) {
+  public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index,
+      boolean isEmptyBadRecord) {
     this.nullFormat = nullFormat;
     this.column = dataField.getColumn();
     if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
@@ -49,6 +51,7 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
           .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
     }
     this.index = index;
+    this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
   @Override
@@ -65,10 +68,12 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
     } else {
       int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
       if (key == 1) {
-        logHolder.setReason(
-            "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
-                .getColName() + " and column data type " + column.getDataType() + " is not a valid "
-                + column.getDataType() + " type.");
+        if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord)) {
+          logHolder.setReason(
+              "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
+                  .getColName() + " and column data type " + column.getDataType()
+                  + " is not a valid " + column.getDataType() + " type.");
+        }
       }
       row.update(key, index);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index 2a7ccec..660f256 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -57,19 +57,21 @@ public class FieldEncoderFactory {
    * @param cache                 dicionary cache.
    * @param carbonTableIdentifier table identifier
    * @param index                 index of column in the row.
+   * @param isEmptyBadRecord
    * @return
    */
   public FieldConverter createFieldEncoder(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
       DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize,
-      Map<Object, Integer> localCache)
+      Map<Object, Integer> localCache, boolean isEmptyBadRecord)
       throws IOException {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimesion()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
-        return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index);
+        return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
+            isEmptyBadRecord);
       } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
         return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
@@ -79,10 +81,10 @@ public class FieldEncoderFactory {
             createComplexType(dataField, cache, carbonTableIdentifier,
                     client, useOnePass, storePath, tableInitialize, localCache), index);
       } else {
-        return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
+        return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
       }
     } else {
-      return new MeasureFieldConverterImpl(dataField, nullFormat, index);
+      return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
index dccbe7e..bb5f983 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
@@ -44,11 +44,15 @@ public class MeasureFieldConverterImpl implements FieldConverter {
 
   private String nullformat;
 
-  public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index) {
+  private boolean isEmptyBadRecord;
+
+  public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
+      boolean isEmptyBadRecord) {
     this.dataType = dataField.getColumn().getDataType();
     this.measure = (CarbonMeasure) dataField.getColumn();
     this.nullformat = nullformat;
     this.index = index;
+    this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
   @Override
@@ -57,11 +61,18 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     String value = row.getString(index);
     Object output;
     boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
-    if (value == null || value.length() == 0 || isNull) {
+    if (value == null || isNull) {
       logHolder.setReason(
           "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
               + " and column data type " + dataType + " is not a valid " + dataType + " type.");
       row.update(null, index);
+    } else if (value.length() == 0) {
+      if (isEmptyBadRecord) {
+        logHolder.setReason(
+            "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+                + " and column data type " + dataType + " is not a valid " + dataType + " type.");
+      }
+      row.update(null, index);
     } else if (value.equals(nullformat)) {
       row.update(null, index);
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index c044c51..9e4b50d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -37,11 +37,15 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
 
   private CarbonColumn column;
 
-  public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index) {
+  private boolean isEmptyBadRecord;
+
+  public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index,
+      boolean isEmptyBadRecord) {
     this.dataType = dataField.getColumn().getDataType();
     this.column = dataField.getColumn();
     this.index = index;
     this.nullformat = nullformat;
+    this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
   @Override
@@ -52,10 +56,12 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
     }
     if (dataType != DataType.STRING) {
       if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) {
-        logHolder.setReason(
-            "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
-                .getColName() + " and column data type " + dataType + " is not a valid " + dataType
-                + " type.");
+        if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+          logHolder.setReason(
+              "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
+                  .getColName() + " and column data type " + dataType + " is not a valid "
+                  + dataType + " type.");
+        }
       }
     }
     row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 42c8c7f..aa05798 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -88,6 +88,9 @@ public class RowConverterImpl implements RowConverter {
     String nullFormat =
         configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
             .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
     List<FieldConverter> fieldConverterList = new ArrayList<>();
     localCaches = new Map[fields.length];
     long lruCacheStartTime = System.currentTimeMillis();
@@ -98,9 +101,9 @@ public class RowConverterImpl implements RowConverter {
       localCaches[i] = new ConcurrentHashMap<>();
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
-              client, configuration.getUseOnePass(),
-              configuration.getTableIdentifier().getStorePath(), true, localCaches[i]);
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+              configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
+              true, localCaches[i], isEmptyBadRecord);
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -121,7 +124,7 @@ public class RowConverterImpl implements RowConverter {
           Thread.currentThread().setName("Dictionary client");
           DictionaryClient dictionaryClient = new DictionaryClient();
           dictionaryClient.startClient(configuration.getDictionaryServerHost(),
-                  configuration.getDictionaryServerPort());
+              configuration.getDictionaryServerPort());
           return dictionaryClient;
         }
       });
@@ -146,15 +149,18 @@ public class RowConverterImpl implements RowConverter {
   @Override
   public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
     CarbonRow copy = row.getCopy();
+    logHolder.setLogged(false);
+    logHolder.clear();
     for (int i = 0; i < fieldConverters.length; i++) {
       fieldConverters[i].convert(row, logHolder);
-      if (logHolder.isBadRecordNotAdded()) {
+      if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
         if (badRecordLogger.isDataLoadFail()) {
           String error = "Data load failed due to bad bad record: " + logHolder.getReason();
           throw new CarbonDataLoadingException(error);
         }
         badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
         logHolder.clear();
+        logHolder.setLogged(true);
         if (badRecordLogger.isBadRecordConvertNullDisable()) {
           return null;
         }
@@ -202,14 +208,16 @@ public class RowConverterImpl implements RowConverter {
     String nullFormat =
         configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
             .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = null;
       try {
-        fieldConverter = FieldEncoderFactory.getInstance()
-            .createFieldEncoder(fields[i], cache,
-                configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
-                client, configuration.getUseOnePass(),
-                configuration.getTableIdentifier().getStorePath(), false, localCaches[i]);
+        fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
+            configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+            configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(), false,
+            localCaches[i], isEmptyBadRecord);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index c0a1cad..eae9672 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
 public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   private List<RowConverter> converters;
+  private BadRecordsLogger badRecordLogger;
 
   public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
@@ -60,7 +61,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   public void initialize() throws IOException {
     child.initialize();
     converters = new ArrayList<>();
-    BadRecordsLogger badRecordLogger = createBadRecordLogger();
+    badRecordLogger = createBadRecordLogger();
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
     converters.add(converter);
@@ -149,9 +150,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
         identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
-        identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
-            + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable,
-        badRecordConvertNullDisable, isDataLoadFail);
+        identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+            + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
     return badRecordsLogger;
   }
 
@@ -166,7 +168,9 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   @Override
   public void close() {
     if (!closed) {
-      createBadRecordLogger().closeStreams();
+      if (null != badRecordLogger) {
+        badRecordLogger.closeStreams();
+      }
       super.close();
       if (converters != null) {
         for (RowConverter converter : converters) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 0277138..8fd2fba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -52,6 +52,8 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
 
   private Partitioner<Object[]> partitioner;
 
+  private BadRecordsLogger badRecordLogger;
+
   public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
@@ -66,7 +68,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   public void initialize() throws IOException {
     child.initialize();
     converters = new ArrayList<>();
-    BadRecordsLogger badRecordLogger = createBadRecordLogger();
+    badRecordLogger = createBadRecordLogger();
     RowConverter converter =
         new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
     converters.add(converter);
@@ -194,7 +196,9 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
   public void close() {
     if (!closed) {
       super.close();
-      createBadRecordLogger().closeStreams();
+      if (null != badRecordLogger) {
+        badRecordLogger.closeStreams();
+      }
       if (converters != null) {
         for (RowConverter converter : converters) {
           converter.finish();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 4192943..74a1574 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -75,6 +75,7 @@ import org.apache.carbondata.processing.store.colgroup.DataHolder;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
 
 import org.apache.spark.sql.types.Decimal;
@@ -261,6 +262,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   private long schemaUpdatedTimeStamp;
 
+  private String segmentId;
   /**
    * current data format version
    */
@@ -357,6 +359,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimLens = this.segmentProperties.getDimColumnsCardinality();
     this.carbonDataFileAttributes = carbonFactDataHandlerModel.getCarbonDataFileAttributes();
     this.schemaUpdatedTimeStamp = carbonFactDataHandlerModel.getSchemaUpdatedTimeStamp();
+    this.segmentId = carbonFactDataHandlerModel.getSegmentId();
     //TODO need to pass carbon table identifier to metadata
     CarbonTable carbonTable = CarbonMetadata.getInstance()
         .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
@@ -1143,6 +1146,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       LOGGER.info("All blocklets have been finished writing");
       // close all the open stream for both the files
       this.dataWriter.closeWriter();
+      // rename the bad record in progress to normal
+      CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
+          this.databaseName + File.separator + this.tableName + File.separator + this.segmentId
+              + File.separator + this.carbonDataFileAttributes.getTaskId());
     }
     this.dataWriter = null;
     this.keyBlockHolder = null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 92b0007..c67dc0e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -178,6 +178,8 @@ public class CarbonFactDataHandlerModel {
 
   private int bucketId = 0;
 
+  private String segmentId;
+
   /**
    * schema updated time stamp to be used for restructure scenarios
    */
@@ -290,6 +292,7 @@ public class CarbonFactDataHandlerModel {
       carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
     }
     carbonFactDataHandlerModel.bucketId = bucketId;
+    carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
     return carbonFactDataHandlerModel;
   }
 
@@ -513,5 +516,13 @@ public class CarbonFactDataHandlerModel {
   public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
     this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
   }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
 }
 


[2/2] incubator-carbondata git commit: [CARBONDATA-784] Empty data to be treated as bad record configurable.This closes #660

Posted by gv...@apache.org.
[CARBONDATA-784] Empty data to be treated as bad record configurable.This closes #660


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d4f07ae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d4f07ae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d4f07ae9

Branch: refs/heads/master
Commit: d4f07ae935dc30086ea834f46644d7732468d2c9
Parents: b8f1d3e 3251c89
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Fri Mar 17 13:18:01 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 17 13:18:01 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   7 +-
 .../badrecordloger/BadRecordEmptyDataTest.scala | 167 +++++++++++++++++++
 .../badrecordloger/BadRecordLoggerTest.scala    |   6 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  11 +-
 .../spark/sql/test/TestQueryExecutor.scala      |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   9 +-
 .../execution/command/carbonTableSchema.scala   |  12 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   9 +-
 .../execution/command/carbonTableSchema.scala   |  13 +-
 .../sql/test/Spark2TestQueryExecutor.scala      |   1 +
 .../processing/csvload/CSVInputFormat.java      |   1 +
 .../csvreaderstep/UnivocityCsvParser.java       |   1 +
 .../processing/model/CarbonLoadModel.java       |  15 ++
 .../newflow/DataLoadProcessBuilder.java         |   2 +
 .../constants/DataLoadProcessorConstants.java   |   2 +
 .../newflow/converter/BadRecordLogHolder.java   |  10 ++
 .../DirectDictionaryFieldConverterImpl.java     |  15 +-
 .../converter/impl/FieldEncoderFactory.java     |  10 +-
 .../impl/MeasureFieldConverterImpl.java         |  15 +-
 .../impl/NonDictionaryFieldConverterImpl.java   |  16 +-
 .../converter/impl/RowConverterImpl.java        |  28 ++--
 .../steps/DataConverterProcessorStepImpl.java   |  14 +-
 ...ConverterProcessorWithBucketingStepImpl.java |   8 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   7 +
 .../store/CarbonFactDataHandlerModel.java       |  11 ++
 25 files changed, 347 insertions(+), 48 deletions(-)
----------------------------------------------------------------------