You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/03/17 07:48:47 UTC
[1/2] incubator-carbondata git commit: [CARBONDATA-784] Bad record
making configurable empty data not a bad record.
Repository: incubator-carbondata
Updated Branches:
refs/heads/master b8f1d3ec6 -> d4f07ae93
[CARBONDATA-784] Bad record making configurable empty data not a bad record.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3251c894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3251c894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3251c894
Branch: refs/heads/master
Commit: 3251c8941c2ba088cfcf3bd9ca9e6a7563e6bf67
Parents: b8f1d3e
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Wed Mar 8 15:34:00 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 17 13:08:03 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 7 +-
.../badrecordloger/BadRecordEmptyDataTest.scala | 167 +++++++++++++++++++
.../badrecordloger/BadRecordLoggerTest.scala | 6 +-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 11 +-
.../spark/sql/test/TestQueryExecutor.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 9 +-
.../execution/command/carbonTableSchema.scala | 12 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 9 +-
.../execution/command/carbonTableSchema.scala | 13 +-
.../sql/test/Spark2TestQueryExecutor.scala | 1 +
.../processing/csvload/CSVInputFormat.java | 1 +
.../csvreaderstep/UnivocityCsvParser.java | 1 +
.../processing/model/CarbonLoadModel.java | 15 ++
.../newflow/DataLoadProcessBuilder.java | 2 +
.../constants/DataLoadProcessorConstants.java | 2 +
.../newflow/converter/BadRecordLogHolder.java | 10 ++
.../DirectDictionaryFieldConverterImpl.java | 15 +-
.../converter/impl/FieldEncoderFactory.java | 10 +-
.../impl/MeasureFieldConverterImpl.java | 15 +-
.../impl/NonDictionaryFieldConverterImpl.java | 16 +-
.../converter/impl/RowConverterImpl.java | 28 ++--
.../steps/DataConverterProcessorStepImpl.java | 14 +-
...ConverterProcessorWithBucketingStepImpl.java | 8 +-
.../store/CarbonFactDataHandlerColumnar.java | 7 +
.../store/CarbonFactDataHandlerModel.java | 11 ++
25 files changed, 347 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ff257dd..853d3b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -185,8 +185,7 @@ public final class CarbonCommonConstants {
/**
* CARBON_BADRECORDS_LOCATION_DEFAULT
*/
- public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL =
- "../unibi-solutions/system/carbon/badRecords";
+ public static final String CARBON_BADRECORDS_LOC_DEFAULT_VAL = "/tmp/carbon/badRecords";
/**
* HIERARCHY_FILE_EXTENSION
*/
@@ -1164,6 +1163,10 @@ public final class CarbonCommonConstants {
public static final int SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT = 4000;
+ public static final String CARBON_BAD_RECORDS_ACTION = "carbon.bad.records.action";
+
+ public static final String CARBON_BAD_RECORDS_ACTION_DEFAULT = "FORCE";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
new file mode 100644
index 0000000..37dc26d
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordEmptyDataTest.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.badrecordloger
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class BadRecordEmptyDataTest extends QueryTest with BeforeAndAfterAll {
+ var hiveContext: HiveContext = _
+
+ override def beforeAll {
+ try {
+ sql("drop table IF EXISTS emptyColumnValues")
+ sql("drop table IF EXISTS emptyColumnValues_false")
+ sql("drop table IF EXISTS empty_timestamp")
+ sql("drop table IF EXISTS empty_timestamp_false")
+ sql("drop table IF EXISTS dataloadOptionTests")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ new File("./target/test/badRecords")
+ .getCanonicalPath)
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ var csvFilePath = ""
+
+ // 1. empty data for string data type - take empty value
+ // 2. empty data for non-string data type - Bad records/Null value based on configuration
+ //table should have only two records.
+ sql(
+ """CREATE TABLE IF NOT EXISTS emptyColumnValues(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+ """)
+ csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+ +
+ "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='false'," +
+ " 'bad_records_action'='ignore', " +
+ "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+ // load with bad_records_logger_enable to false
+ sql(
+ """CREATE TABLE IF NOT EXISTS emptyColumnValues_false(ID BigInt, date Timestamp, country
+ String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+ """)
+ csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues_false OPTIONS"
+ + "('bad_records_logger_enable'='false', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+ // 4.1 Time stamp empty data - Bad records/Null value based on configuration
+ // 5. non-parsable data - Bad records/Null value based on configuration
+ // 6. empty line(check current one) - Bad records/Null value based on configuration
+ // only one value should be loadded.
+ sql(
+ """CREATE TABLE IF NOT EXISTS empty_timestamp(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+ """)
+ csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+ +
+ "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='false' ," +
+ "'bad_records_action'='ignore', " +
+ "'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+ // load with bad_records_logger_enable to false
+ sql(
+ """CREATE TABLE IF NOT EXISTS empty_timestamp_false(ID BigInt, date Timestamp, country
+ String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+ """)
+ csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp_false OPTIONS"
+ + "('bad_records_logger_enable'='false','IS_EMPTY_DATA_BAD_RECORD'='false'," +
+ " 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+
+
+ } catch {
+ case x: Throwable => CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+ }
+
+ test("select count(*) from empty_timestamp") {
+ checkAnswer(
+ sql("select count(*) from empty_timestamp"),
+ Seq(Row(2)
+ )
+ )
+ }
+
+ test("select count(*) from emptyColumnValues") {
+ checkAnswer(
+ sql("select count(*) from emptyColumnValues"),
+ Seq(Row(7)
+ )
+ )
+ }
+
+ test("select count(*) from emptyColumnValues_false") {
+ checkAnswer(
+ sql("select count(*) from emptyColumnValues_false"),
+ Seq(Row(7)
+ )
+ )
+ }
+
+ test("select count(*) from empty_timestamp_false") {
+ checkAnswer(
+ sql("select count(*) from empty_timestamp_false"),
+ Seq(Row(7)
+ )
+ )
+ }
+
+ test("test load ddl command") {
+ sql(
+ """CREATE TABLE IF NOT EXISTS dataloadOptionTests(ID BigInt, date Timestamp, country
+ String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'
+ """)
+ val csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
+ try {
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE dataloadOptionTests OPTIONS"
+ + "('IS_EMPTY_DATA_BAD_RECORD'='xyz', 'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
+ } catch {
+ case ex: Exception =>
+ assert("option IS_EMPTY_DATA_BAD_RECORD can have option either true or false"
+ .equals(ex.getMessage))
+ }
+ }
+
+ override def afterAll {
+ sql("drop table emptyColumnValues")
+ sql("drop table emptyColumnValues_false")
+ sql("drop table empty_timestamp")
+ sql("drop table empty_timestamp_false")
+ sql("drop table dataloadOptionTests")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
index 125c819..5716978 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerTest.scala
@@ -113,7 +113,8 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
csvFilePath = s"$resourcesPath/badrecords/emptyValues.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE emptyColumnValues OPTIONS"
+
- "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+ "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true'," +
+ " 'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// load with bad_records_logger_enable to false
sql(
@@ -137,7 +138,8 @@ class BadRecordLoggerTest extends QueryTest with BeforeAndAfterAll {
csvFilePath = s"$resourcesPath/badrecords/emptyTimeStampValue.csv"
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE empty_timestamp OPTIONS"
+
- "('bad_records_logger_enable'='true', 'bad_records_action'='ignore', " +
+ "('bad_records_logger_enable'='true','IS_EMPTY_DATA_BAD_RECORD'='true' ," +
+ "'bad_records_action'='ignore', " +
"'DELIMITER'= ',', 'QUOTECHAR'= '\"')");
// load with bad_records_logger_enable to false
sql(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 56f6e6d..9c5ce83 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -84,6 +84,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val SERIALIZATION_NULL_FORMAT = carbonKeyWord("SERIALIZATION_NULL_FORMAT")
protected val BAD_RECORDS_LOGGER_ENABLE = carbonKeyWord("BAD_RECORDS_LOGGER_ENABLE")
protected val BAD_RECORDS_ACTION = carbonKeyWord("BAD_RECORDS_ACTION")
+ protected val IS_EMPTY_DATA_BAD_RECORD = carbonKeyWord("IS_EMPTY_DATA_BAD_RECORD")
+ protected val IS_EMPTY_COMMA_DATA_BAD_RECORD = carbonKeyWord("IS_NULL_DATA_BAD_RECORD")
protected val FILES = carbonKeyWord("FILES")
protected val FROM = carbonKeyWord("FROM")
protected val HIERARCHIES = carbonKeyWord("HIERARCHIES")
@@ -750,7 +752,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
"COMPLEX_DELIMITER_LEVEL_1", "COMPLEX_DELIMITER_LEVEL_2", "COLUMNDICT",
"SERIALIZATION_NULL_FORMAT", "BAD_RECORDS_LOGGER_ENABLE", "BAD_RECORDS_ACTION",
"ALL_DICTIONARY_PATH", "MAXCOLUMNS", "COMMENTCHAR", "USE_KETTLE", "DATEFORMAT",
- "SINGLE_PASS"
+ "SINGLE_PASS", "IS_EMPTY_DATA_BAD_RECORD"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
@@ -797,6 +799,13 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
"option BAD_RECORDS_ACTION can have only either FORCE or IGNORE or REDIRECT")
}
}
+ if (options.exists(_._1.equalsIgnoreCase("IS_EMPTY_DATA_BAD_RECORD"))) {
+ val optionValue: String = options.get("is_empty_data_bad_record").get.head._2
+ if (!("true".equalsIgnoreCase(optionValue) || "false".equalsIgnoreCase(optionValue))) {
+ throw new MalformedCarbonCommandException(
+ "option IS_EMPTY_DATA_BAD_RECORD can have option either true or false")
+ }
+ }
// check for duplicate options
val duplicateOptions = options filter {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index f284345..f8e2bc1 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.util.Utils
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
/**
* the sql executor of spark-common-test
@@ -50,7 +52,8 @@ object TestQueryExecutor {
val timestampFormat = "dd-MM-yyyy"
val INSTANCE = lookupQueryExecutor.newInstance().asInstanceOf[TestQueryExecutorRegister]
-
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
private def lookupQueryExecutor: Class[_] = {
ServiceLoader.load(classOf[TestQueryExecutorRegister], Utils.getContextOrSparkClassLoader)
.iterator().next().getClass
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index fe92aad..306d277 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -973,8 +973,13 @@ object CarbonDataRDDFactory {
shutDownDictionaryServer(carbonLoadModel, result)
- LOGGER.audit("Data load is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
+ LOGGER.audit("Data load is partially successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ } else {
+ LOGGER.audit("Data load is successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ }
try {
// compaction handling
handleSegmentMerging(tableCreationTime)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 62463fe..6f14feb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,6 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
@@ -416,7 +417,11 @@ case class LoadTable(
val columnDict = options.getOrElse("columndict", null)
val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
- val badRecordsLoggerRedirect = options.getOrElse("bad_records_action", "force")
+ val badRecordActionValue = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+ val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
+ val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
@@ -443,7 +448,10 @@ case class LoadTable(
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
carbonLoadModel
.setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
+ TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
+ carbonLoadModel
+ .setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
// when single_pass=true, and use_kettle=false, and not use all dict
val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
case "true" =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 771e455..eb9e88b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -970,8 +970,13 @@ object CarbonDataRDDFactory {
}
shutdownDictionaryServer(carbonLoadModel, result)
- LOGGER.audit("Data load is successful for " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
+ LOGGER.audit("Data load is partially successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ } else {
+ LOGGER.audit("Data load is successful for " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ }
try {
// compaction handling
handleSegmentMerging(tableCreationTime)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 2b75c56..002b6f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -54,6 +54,7 @@ import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.processing.constants.TableOptionConstant
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, DataTypeConverterUtil, GlobalDictionaryUtil}
@@ -418,7 +419,11 @@ case class LoadTable(
val columnDict = options.getOrElse("columndict", null)
val serializationNullFormat = options.getOrElse("serialization_null_format", "\\N")
val badRecordsLoggerEnable = options.getOrElse("bad_records_logger_enable", "false")
- val badRecordsLoggerRedirect = options.getOrElse("bad_records_action", "force")
+ val badRecordActionValue = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+ val badRecordsAction = options.getOrElse("bad_records_action", badRecordActionValue)
+ val isEmptyDataBadRecord = options.getOrElse("is_empty_data_bad_record", "false")
val allDictionaryPath = options.getOrElse("all_dictionary_path", "")
val complex_delimiter_level_1 = options.getOrElse("complex_delimiter_level_1", "\\$")
val complex_delimiter_level_2 = options.getOrElse("complex_delimiter_level_2", "\\:")
@@ -444,8 +449,10 @@ case class LoadTable(
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + badRecordsLoggerEnable)
carbonLoadModel
.setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsLoggerRedirect)
-
+ TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + badRecordsAction)
+ carbonLoadModel
+ .setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
case "true" =>
if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index b8f715f..15d5597 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -43,6 +43,7 @@ object Spark2TestQueryExecutor {
.addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir"))
.addProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_LOCAL)
+ .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE")
import org.apache.spark.sql.CarbonSession._
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
index 269a127..7545fe5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -215,6 +215,7 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
parserSettings.setLineSeparatorDetectionEnabled(true);
parserSettings.setNullValue("");
+ parserSettings.setEmptyValue("");
parserSettings.setIgnoreLeadingWhitespaces(false);
parserSettings.setIgnoreTrailingWhitespaces(false);
parserSettings.setSkipEmptyLines(false);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index f33442b..f99fab4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -100,6 +100,7 @@ public class UnivocityCsvParser {
parserSettings.setMaxColumns(
getMaxColumnsForParsing(csvParserVo.getNumberOfColumns(), csvParserVo.getMaxColumns()));
parserSettings.setNullValue("");
+ parserSettings.setEmptyValue("");
parserSettings.setIgnoreLeadingWhitespaces(false);
parserSettings.setIgnoreTrailingWhitespaces(false);
parserSettings.setSkipEmptyLines(false);
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index a628971..5f8c141 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -141,6 +141,11 @@ public class CarbonLoadModel implements Serializable {
private String carbondataFileName = "";
+ /**
+ * defines the string to specify whether empty data is good or bad
+ */
+ private String isEmptyDataBadRecord;
+
public String getCarbondataFileName() {
return carbondataFileName;
}
@@ -387,6 +392,7 @@ public class CarbonLoadModel implements Serializable {
copy.dictionaryServerHost = dictionaryServerHost;
copy.dictionaryServerPort = dictionaryServerPort;
copy.preFetch = preFetch;
+ copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
return copy;
}
@@ -437,6 +443,7 @@ public class CarbonLoadModel implements Serializable {
copyObj.dictionaryServerHost = dictionaryServerHost;
copyObj.dictionaryServerPort = dictionaryServerPort;
copyObj.preFetch = preFetch;
+ copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
return copyObj;
}
@@ -760,4 +767,12 @@ public class CarbonLoadModel implements Serializable {
public void setDefaultDateFormat(String defaultDateFormat) {
this.defaultDateFormat = defaultDateFormat;
}
+
+ public String getIsEmptyDataBadRecord() {
+ return isEmptyDataBadRecord;
+ }
+
+ public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
+ this.isEmptyDataBadRecord = isEmptyDataBadRecord;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 35b7989..f17779c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -130,6 +130,8 @@ public final class DataLoadProcessBuilder {
loadModel.getBadRecordsLoggerEnable().split(",")[1]);
configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
loadModel.getBadRecordsAction().split(",")[1]);
+ configuration.setDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD,
+ loadModel.getIsEmptyDataBadRecord().split(",")[1]);
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
loadModel.getFactFilePath());
if (CarbonMetadata.getInstance().getCarbonTable(carbonTable.getTableUniqueName()) == null) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
index 78a48b7..ea06cd0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
@@ -40,6 +40,8 @@ public final class DataLoadProcessorConstants {
public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
+ public static final String IS_EMPTY_DATA_BAD_RECORD = "IS_EMPTY_DATA_BAD_RECORD";
+
public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
index 69bf6e3..42a6fec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
@@ -25,6 +25,8 @@ public class BadRecordLogHolder {
private boolean badRecordAdded;
+ private boolean isLogged;
+
public String getReason() {
return reason;
}
@@ -41,4 +43,12 @@ public class BadRecordLogHolder {
public void clear() {
this.badRecordAdded = false;
}
+
+ public boolean isLogged() {
+ return isLogged;
+ }
+
+ public void setLogged(boolean logged) {
+ isLogged = logged;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
index 63dfc53..05ffe8b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -35,8 +35,10 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
private String nullFormat;
private CarbonColumn column;
+ private boolean isEmptyBadRecord;
- public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index) {
+ public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index,
+ boolean isEmptyBadRecord) {
this.nullFormat = nullFormat;
this.column = dataField.getColumn();
if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
@@ -49,6 +51,7 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
.getDirectDictionaryGenerator(dataField.getColumn().getDataType());
}
this.index = index;
+ this.isEmptyBadRecord = isEmptyBadRecord;
}
@Override
@@ -65,10 +68,12 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
} else {
int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
if (key == 1) {
- logHolder.setReason(
- "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
- .getColName() + " and column data type " + column.getDataType() + " is not a valid "
- + column.getDataType() + " type.");
+ if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord)) {
+ logHolder.setReason(
+ "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
+ .getColName() + " and column data type " + column.getDataType()
+ + " is not a valid " + column.getDataType() + " type.");
+ }
}
row.update(key, index);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index 2a7ccec..660f256 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -57,19 +57,21 @@ public class FieldEncoderFactory {
* @param cache dicionary cache.
* @param carbonTableIdentifier table identifier
* @param index index of column in the row.
+ * @param isEmptyBadRecord
* @return
*/
public FieldConverter createFieldEncoder(DataField dataField,
Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize,
- Map<Object, Integer> localCache)
+ Map<Object, Integer> localCache, boolean isEmptyBadRecord)
throws IOException {
// Converters are only needed for dimensions and measures it return null.
if (dataField.getColumn().isDimesion()) {
if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!dataField.getColumn().isComplex()) {
- return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index);
+ return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
+ isEmptyBadRecord);
} else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
!dataField.getColumn().isComplex()) {
return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
@@ -79,10 +81,10 @@ public class FieldEncoderFactory {
createComplexType(dataField, cache, carbonTableIdentifier,
client, useOnePass, storePath, tableInitialize, localCache), index);
} else {
- return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
+ return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
} else {
- return new MeasureFieldConverterImpl(dataField, nullFormat, index);
+ return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
index dccbe7e..bb5f983 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
@@ -44,11 +44,15 @@ public class MeasureFieldConverterImpl implements FieldConverter {
private String nullformat;
- public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index) {
+ private boolean isEmptyBadRecord;
+
+ public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
+ boolean isEmptyBadRecord) {
this.dataType = dataField.getColumn().getDataType();
this.measure = (CarbonMeasure) dataField.getColumn();
this.nullformat = nullformat;
this.index = index;
+ this.isEmptyBadRecord = isEmptyBadRecord;
}
@Override
@@ -57,11 +61,18 @@ public class MeasureFieldConverterImpl implements FieldConverter {
String value = row.getString(index);
Object output;
boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
- if (value == null || value.length() == 0 || isNull) {
+ if (value == null || isNull) {
logHolder.setReason(
"The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+ " and column data type " + dataType + " is not a valid " + dataType + " type.");
row.update(null, index);
+ } else if (value.length() == 0) {
+ if (isEmptyBadRecord) {
+ logHolder.setReason(
+ "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+ + " and column data type " + dataType + " is not a valid " + dataType + " type.");
+ }
+ row.update(null, index);
} else if (value.equals(nullformat)) {
row.update(null, index);
} else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index c044c51..9e4b50d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -37,11 +37,15 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
private CarbonColumn column;
- public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index) {
+ private boolean isEmptyBadRecord;
+
+ public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index,
+ boolean isEmptyBadRecord) {
this.dataType = dataField.getColumn().getDataType();
this.column = dataField.getColumn();
this.index = index;
this.nullformat = nullformat;
+ this.isEmptyBadRecord = isEmptyBadRecord;
}
@Override
@@ -52,10 +56,12 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
}
if (dataType != DataType.STRING) {
if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) {
- logHolder.setReason(
- "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
- .getColName() + " and column data type " + dataType + " is not a valid " + dataType
- + " type.");
+ if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+ logHolder.setReason(
+ "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
+ .getColName() + " and column data type " + dataType + " is not a valid "
+ + dataType + " type.");
+ }
}
}
row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 42c8c7f..aa05798 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -88,6 +88,9 @@ public class RowConverterImpl implements RowConverter {
String nullFormat =
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
.toString();
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
List<FieldConverter> fieldConverterList = new ArrayList<>();
localCaches = new Map[fields.length];
long lruCacheStartTime = System.currentTimeMillis();
@@ -98,9 +101,9 @@ public class RowConverterImpl implements RowConverter {
localCaches[i] = new ConcurrentHashMap<>();
FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
.createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
- client, configuration.getUseOnePass(),
- configuration.getTableIdentifier().getStorePath(), true, localCaches[i]);
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+ configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
+ true, localCaches[i], isEmptyBadRecord);
fieldConverterList.add(fieldConverter);
}
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -121,7 +124,7 @@ public class RowConverterImpl implements RowConverter {
Thread.currentThread().setName("Dictionary client");
DictionaryClient dictionaryClient = new DictionaryClient();
dictionaryClient.startClient(configuration.getDictionaryServerHost(),
- configuration.getDictionaryServerPort());
+ configuration.getDictionaryServerPort());
return dictionaryClient;
}
});
@@ -146,15 +149,18 @@ public class RowConverterImpl implements RowConverter {
@Override
public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
CarbonRow copy = row.getCopy();
+ logHolder.setLogged(false);
+ logHolder.clear();
for (int i = 0; i < fieldConverters.length; i++) {
fieldConverters[i].convert(row, logHolder);
- if (logHolder.isBadRecordNotAdded()) {
+ if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
if (badRecordLogger.isDataLoadFail()) {
String error = "Data load failed due to bad bad record: " + logHolder.getReason();
throw new CarbonDataLoadingException(error);
}
badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
logHolder.clear();
+ logHolder.setLogged(true);
if (badRecordLogger.isBadRecordConvertNullDisable()) {
return null;
}
@@ -202,14 +208,16 @@ public class RowConverterImpl implements RowConverter {
String nullFormat =
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
.toString();
+ boolean isEmptyBadRecord = Boolean.parseBoolean(
+ configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+ .toString());
for (int i = 0; i < fields.length; i++) {
FieldConverter fieldConverter = null;
try {
- fieldConverter = FieldEncoderFactory.getInstance()
- .createFieldEncoder(fields[i], cache,
- configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
- client, configuration.getUseOnePass(),
- configuration.getTableIdentifier().getStorePath(), false, localCaches[i]);
+ fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
+ configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+ configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(), false,
+ localCaches[i], isEmptyBadRecord);
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index c0a1cad..eae9672 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecor
public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
private List<RowConverter> converters;
+ private BadRecordsLogger badRecordLogger;
public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
@@ -60,7 +61,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
public void initialize() throws IOException {
child.initialize();
converters = new ArrayList<>();
- BadRecordsLogger badRecordLogger = createBadRecordLogger();
+ badRecordLogger = createBadRecordLogger();
RowConverter converter =
new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
converters.add(converter);
@@ -149,9 +150,10 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
configuration.getTableIdentifier().getCarbonTableIdentifier();
BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
- identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
- + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable,
- badRecordConvertNullDisable, isDataLoadFail);
+ identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+ .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+ + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+ badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
return badRecordsLogger;
}
@@ -166,7 +168,9 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
@Override
public void close() {
if (!closed) {
- createBadRecordLogger().closeStreams();
+ if (null != badRecordLogger) {
+ badRecordLogger.closeStreams();
+ }
super.close();
if (converters != null) {
for (RowConverter converter : converters) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 0277138..8fd2fba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -52,6 +52,8 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
private Partitioner<Object[]> partitioner;
+ private BadRecordsLogger badRecordLogger;
+
public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
super(configuration, child);
@@ -66,7 +68,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
public void initialize() throws IOException {
child.initialize();
converters = new ArrayList<>();
- BadRecordsLogger badRecordLogger = createBadRecordLogger();
+ badRecordLogger = createBadRecordLogger();
RowConverter converter =
new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
converters.add(converter);
@@ -194,7 +196,9 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa
public void close() {
if (!closed) {
super.close();
- createBadRecordLogger().closeStreams();
+ if (null != badRecordLogger) {
+ badRecordLogger.closeStreams();
+ }
if (converters != null) {
for (RowConverter converter : converters) {
converter.finish();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 4192943..74a1574 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -75,6 +75,7 @@ import org.apache.carbondata.processing.store.colgroup.DataHolder;
import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
import org.apache.spark.sql.types.Decimal;
@@ -261,6 +262,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
private long schemaUpdatedTimeStamp;
+ private String segmentId;
/**
* current data format version
*/
@@ -357,6 +359,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
this.dimLens = this.segmentProperties.getDimColumnsCardinality();
this.carbonDataFileAttributes = carbonFactDataHandlerModel.getCarbonDataFileAttributes();
this.schemaUpdatedTimeStamp = carbonFactDataHandlerModel.getSchemaUpdatedTimeStamp();
+ this.segmentId = carbonFactDataHandlerModel.getSegmentId();
//TODO need to pass carbon table identifier to metadata
CarbonTable carbonTable = CarbonMetadata.getInstance()
.getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
@@ -1143,6 +1146,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
LOGGER.info("All blocklets have been finished writing");
// close all the open stream for both the files
this.dataWriter.closeWriter();
+ // rename the bad record in progress to normal
+ CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(
+ this.databaseName + File.separator + this.tableName + File.separator + this.segmentId
+ + File.separator + this.carbonDataFileAttributes.getTaskId());
}
this.dataWriter = null;
this.keyBlockHolder = null;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3251c894/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 92b0007..c67dc0e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -178,6 +178,8 @@ public class CarbonFactDataHandlerModel {
private int bucketId = 0;
+ private String segmentId;
+
/**
* schema updated time stamp to be used for restructure scenarios
*/
@@ -290,6 +292,7 @@ public class CarbonFactDataHandlerModel {
carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
}
carbonFactDataHandlerModel.bucketId = bucketId;
+ carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
return carbonFactDataHandlerModel;
}
@@ -513,5 +516,13 @@ public class CarbonFactDataHandlerModel {
public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
}
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public void setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ }
}
[2/2] incubator-carbondata git commit: [CARBONDATA-784] Empty data to
be treated as bad record configurable.This closes #660
Posted by gv...@apache.org.
[CARBONDATA-784] Empty data to be treated as bad record configurable.This closes #660
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d4f07ae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d4f07ae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d4f07ae9
Branch: refs/heads/master
Commit: d4f07ae935dc30086ea834f46644d7732468d2c9
Parents: b8f1d3e 3251c89
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Fri Mar 17 13:18:01 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 17 13:18:01 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 7 +-
.../badrecordloger/BadRecordEmptyDataTest.scala | 167 +++++++++++++++++++
.../badrecordloger/BadRecordLoggerTest.scala | 6 +-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 11 +-
.../spark/sql/test/TestQueryExecutor.scala | 5 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 9 +-
.../execution/command/carbonTableSchema.scala | 12 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 9 +-
.../execution/command/carbonTableSchema.scala | 13 +-
.../sql/test/Spark2TestQueryExecutor.scala | 1 +
.../processing/csvload/CSVInputFormat.java | 1 +
.../csvreaderstep/UnivocityCsvParser.java | 1 +
.../processing/model/CarbonLoadModel.java | 15 ++
.../newflow/DataLoadProcessBuilder.java | 2 +
.../constants/DataLoadProcessorConstants.java | 2 +
.../newflow/converter/BadRecordLogHolder.java | 10 ++
.../DirectDictionaryFieldConverterImpl.java | 15 +-
.../converter/impl/FieldEncoderFactory.java | 10 +-
.../impl/MeasureFieldConverterImpl.java | 15 +-
.../impl/NonDictionaryFieldConverterImpl.java | 16 +-
.../converter/impl/RowConverterImpl.java | 28 ++--
.../steps/DataConverterProcessorStepImpl.java | 14 +-
...ConverterProcessorWithBucketingStepImpl.java | 8 +-
.../store/CarbonFactDataHandlerColumnar.java | 7 +
.../store/CarbonFactDataHandlerModel.java | 11 ++
25 files changed, 347 insertions(+), 48 deletions(-)
----------------------------------------------------------------------