You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:38 UTC
[13/50] [abbrv] carbondata git commit: [CARBONDATA-2136] Fixed bug
related to data load for bad_record_action as REDIRECT or IGNORE and sort
scope as NO_SORT
[CARBONDATA-2136] Fixed bug related to data load for bad_record_action as REDIRECT or IGNORE and sort scope as NO_SORT
Problem: When data loading is performed with bad_record_action as REDIRECT or IGNORE and
with sort_scope option as NO_SORT, it was throwing an error as our row batch was getting filled with null.
Solution: Refactored code for creating carbon row batch for bad_record_action as REDIRECT or IGNORE and sort scope as NO_SORT
This closes #1942
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ebfab15
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ebfab15
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ebfab15
Branch: refs/heads/spark-2.3
Commit: 2ebfab151dbf79a6e0cd19198f53e14a165a2759
Parents: 5969312
Author: Geetika Gupta <ge...@knoldus.in>
Authored: Wed Feb 7 16:12:09 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 22 10:36:35 2018 +0530
----------------------------------------------------------------------
.../badrecordloger/BadRecordActionTest.scala | 189 +++++++++++++++----
.../processing/loading/row/CarbonRowBatch.java | 5 +-
.../steps/DataConverterProcessorStepImpl.java | 12 +-
3 files changed, 161 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ebfab15/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
index d85ee49..959aa6a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -1,30 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.carbondata.spark.testsuite.badrecordloger
import java.io.File
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
+class BadRecordActionTest extends QueryTest {
val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
- def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
- .getCanonicalPath
- val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords")
+ val badRecordFilePath = new File(currentPath + "/target/test/badRecords")
+ initCarbonProperties
- override def beforeAll = {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+ private def initCarbonProperties = {
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
- badRecordFilePath.mkdirs()
- sql("drop table if exists sales")
+ badRecordFilePath.mkdirs()
}
test("test load for bad_record_action=force") {
@@ -34,7 +47,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='force', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
checkAnswer(sql("select count(*) from sales"),
Seq(Row(6)))
@@ -47,7 +60,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='FORCE', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
checkAnswer(sql("select count(*) from sales"),
Seq(Row(6)))
}
@@ -57,11 +70,17 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
- intercept[Exception] {
+ val exception = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='fail', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
}
+ assert(exception.getMessage
+ .contains(
+ "Data load failed due to bad record: The value with column name date and column data " +
+ "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" +
+ " the detail reason"))
+
}
test("test load for bad_record_action=FAIL") {
@@ -69,13 +88,19 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
sql(
"""CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
- intercept[Exception] {
+ val exception = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='FAIL', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
}
+ assert(exception.getMessage
+ .contains(
+ "Data load failed due to bad record: The value with column name date and column data " +
+ "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" +
+ " the detail reason"))
}
+
test("test load for bad_record_action=ignore") {
sql("drop table if exists sales")
sql(
@@ -83,7 +108,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='ignore', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
checkAnswer(sql("select count(*) from sales"),
Seq(Row(2)))
}
@@ -95,7 +120,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='IGNORE', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
checkAnswer(sql("select count(*) from sales"),
Seq(Row(2)))
}
@@ -108,25 +133,25 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
val exMessage = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='')")
+ " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
}
test("test bad record REDIRECT but not having empty location in option should throw exception") {
+ sql("drop table if exists sales")
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
val badRecordLocation = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
- sql("drop table if exists sales")
try {
- sql(
- """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
- actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
val exMessage = intercept[Exception] {
sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
"('bad_records_action'='REDIRECT', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
}
assert(exMessage.getMessage.contains("Invalid bad records location."))
}
@@ -138,29 +163,113 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
test("test bad record is REDIRECT with location in carbon properties should pass") {
sql("drop table if exists sales")
- sql(
- """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
- sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
- "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"')")
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
+ "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales"),
+ Seq(Row(2)))
}
test("test bad record is redirect with location in option while data loading should pass") {
sql("drop table if exists sales")
- sql(
- """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
- sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
- "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
- " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + {badRecordFilePath.getCanonicalPath} +
- "')")
- checkAnswer(sql("select count(*) from sales"),
- Seq(Row(2)))
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
+ "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } +
+ "','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales"),
+ Seq(Row(2)))
}
- override def afterAll() = {
- sql("drop table if exists sales")
+ test("test bad record FORCE option with no_sort as sort scope ") {
+ sql("drop table if exists sales_no_sort")
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+ "('bad_records_action'='FORCE', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales_no_sort"),
+ Seq(Row(6)))
+ }
+
+ test("test bad record REDIRECT option with location and no_sort as sort scope ") {
+ sql("drop table if exists sales_no_sort")
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+ "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } +
+ "','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales_no_sort"),
+ Seq(Row(2)))
+ }
+
+ test("test bad record IGNORE option with no_sort as sort scope ") {
+ sql("drop table if exists sales_no_sort")
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+ "('bad_records_action'='IGNORE', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales_no_sort"),
+ Seq(Row(2)))
}
-}
+ test("test bad record with FAIL option with location and no_sort as sort scope ") {
+ sql("drop table if exists sales_no_sort")
+ sql(
+ """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+ actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+ val exception = intercept[Exception] {
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+ "('bad_records_action'='FAIL', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+ }
+ assert(exception.getMessage
+ .contains(
+ "Data load failed due to bad record: The value with column name date and column data " +
+ "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" +
+ " the detail reason"))
+ }
+
+ test("test bad record with IGNORE option and sort scope as NO_SORT for bucketed table") {
+ sql("drop table if exists sales_bucket")
+ sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," +
+ "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country','sort_scope'='NO_SORT')")
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" +
+ "('bad_records_action'='IGNORE', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales_bucket"),
+ Seq(Row(2)))
+ }
+
+ test("test bad record with REDIRECT option and sort scope as NO_SORT for bucketed table") {
+ sql("drop table if exists sales_bucket")
+ sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," +
+ "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country', 'sort_scope'='NO_SORT')")
+ sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" +
+ "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+ " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } +
+ "','timestampformat'='yyyy/MM/dd')")
+ checkAnswer(sql("select count(*) from sales_bucket"),
+ Seq(Row(2)))
+ }
+
+
+ private def currentPath: String = {
+ new File(this.getClass.getResource("/").getPath + "../../")
+ .getCanonicalPath
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ebfab15/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
index e819dcd..6b75aa3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
@@ -22,6 +22,7 @@ import java.util.NoSuchElementException;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.commons.lang.ArrayUtils;
/**
* Batch of rows.
@@ -59,7 +60,9 @@ public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
}
@Override public void remove() {
-
+ rowBatch = (CarbonRow[]) ArrayUtils.remove(rowBatch, index - 1);
+ --size;
+ --index;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ebfab15/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index 72a8c25..43b2278 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -192,11 +192,15 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
while (rowBatch.hasNext()) {
CarbonRow convertRow = localConverter.convert(rowBatch.next());
- if (isSortColumnRangeEnabled || isBucketColumnEnabled) {
- short rangeNumber = (short) partitioner.getPartition(convertRow);
- convertRow.setRangeId(rangeNumber);
+ if (convertRow == null) {
+ rowBatch.remove();
+ } else {
+ if (isSortColumnRangeEnabled || isBucketColumnEnabled) {
+ short rangeNumber = (short) partitioner.getPartition(convertRow);
+ convertRow.setRangeId(rangeNumber);
+ }
+ rowBatch.setPreviousRow(convertRow);
}
- rowBatch.setPreviousRow(convertRow);
}
rowCounter.getAndAdd(rowBatch.getSize());
// reuse the origin batch