You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/06/08 11:40:38 UTC

[13/50] [abbrv] carbondata git commit: [CARBONDATA-2136] Fixed bug related to data load for bad_record_action as REDIRECT or IGNORE and sort scope as NO_SORT

[CARBONDATA-2136] Fixed bug related to data load for bad_record_action as REDIRECT or IGNORE and sort scope as NO_SORT

Problem: When data loading is performed with bad_record_action as REDIRECT or IGNORE and
with sort_scope option as NO_SORT, it was throwing an error as our row batch was getting filled with null.

Solution: Refactored code for creating carbon row batch for bad_record_action as REDIRECT or IGNORE and sort scope as NO_SORT

This closes #1942


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2ebfab15
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2ebfab15
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2ebfab15

Branch: refs/heads/spark-2.3
Commit: 2ebfab151dbf79a6e0cd19198f53e14a165a2759
Parents: 5969312
Author: Geetika Gupta <ge...@knoldus.in>
Authored: Wed Feb 7 16:12:09 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 22 10:36:35 2018 +0530

----------------------------------------------------------------------
 .../badrecordloger/BadRecordActionTest.scala    | 189 +++++++++++++++----
 .../processing/loading/row/CarbonRowBatch.java  |   5 +-
 .../steps/DataConverterProcessorStepImpl.java   |  12 +-
 3 files changed, 161 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ebfab15/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
index d85ee49..959aa6a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordActionTest.scala
@@ -1,30 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.carbondata.spark.testsuite.badrecordloger
 
 import java.io.File
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.BeforeAndAfterEach
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 
-class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
+class BadRecordActionTest extends QueryTest {
 
 
   val csvFilePath = s"$resourcesPath/badrecords/datasample.csv"
-  def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../")
-    .getCanonicalPath
-  val badRecordFilePath: File =new File(currentPath + "/target/test/badRecords")
+  val badRecordFilePath = new File(currentPath + "/target/test/badRecords")
+  initCarbonProperties
 
-  override def beforeAll = {
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+  private def initCarbonProperties = {
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
-        badRecordFilePath.mkdirs()
-    sql("drop table if exists sales")
+    badRecordFilePath.mkdirs()
   }
 
   test("test load for bad_record_action=force") {
@@ -34,7 +47,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
         "('bad_records_action'='force', 'DELIMITER'=" +
-        " ',', 'QUOTECHAR'= '\"')")
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
     checkAnswer(sql("select count(*) from sales"),
       Seq(Row(6)))
 
@@ -47,7 +60,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
         "('bad_records_action'='FORCE', 'DELIMITER'=" +
-        " ',', 'QUOTECHAR'= '\"')")
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
     checkAnswer(sql("select count(*) from sales"),
       Seq(Row(6)))
   }
@@ -57,11 +70,17 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
     sql(
       """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
-    intercept[Exception] {
+    val exception = intercept[Exception] {
       sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
           "('bad_records_action'='fail', 'DELIMITER'=" +
-          " ',', 'QUOTECHAR'= '\"')")
+          " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
     }
+    assert(exception.getMessage
+      .contains(
+        "Data load failed due to bad record: The value with column name date and column data " +
+        "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" +
+        " the detail reason"))
+
   }
 
   test("test load for bad_record_action=FAIL") {
@@ -69,13 +88,19 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
     sql(
       """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
-    intercept[Exception] {
+    val exception = intercept[Exception] {
       sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
           "('bad_records_action'='FAIL', 'DELIMITER'=" +
-          " ',', 'QUOTECHAR'= '\"')")
+          " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
     }
+    assert(exception.getMessage
+      .contains(
+        "Data load failed due to bad record: The value with column name date and column data " +
+        "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" +
+        " the detail reason"))
   }
 
+
   test("test load for bad_record_action=ignore") {
     sql("drop table if exists sales")
     sql(
@@ -83,7 +108,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
         "('bad_records_action'='ignore', 'DELIMITER'=" +
-        " ',', 'QUOTECHAR'= '\"')")
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
     checkAnswer(sql("select count(*) from sales"),
       Seq(Row(2)))
   }
@@ -95,7 +120,7 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
     sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
         "('bad_records_action'='IGNORE', 'DELIMITER'=" +
-        " ',', 'QUOTECHAR'= '\"')")
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
     checkAnswer(sql("select count(*) from sales"),
       Seq(Row(2)))
   }
@@ -108,25 +133,25 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
     val exMessage = intercept[Exception] {
       sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
           "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
-          " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='')")
+          " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='','timestampformat'='yyyy/MM/dd')")
     }
     assert(exMessage.getMessage.contains("Invalid bad records location."))
   }
 
   test("test bad record REDIRECT but not having empty location in option should throw exception") {
+    sql("drop table if exists sales")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
     val badRecordLocation = CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
       CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)
-    sql("drop table if exists sales")
     try {
-      sql(
-        """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
-          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
       val exMessage = intercept[Exception] {
         sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
             "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
-            " ',', 'QUOTECHAR'= '\"')")
+            " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
       }
       assert(exMessage.getMessage.contains("Invalid bad records location."))
     }
@@ -138,29 +163,113 @@ class BadRecordActionTest extends QueryTest with BeforeAndAfterAll {
 
   test("test bad record is REDIRECT with location in carbon properties should pass") {
     sql("drop table if exists sales")
-      sql(
-        """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
-      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
-          "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
-          " ',', 'QUOTECHAR'= '\"')")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
+        "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales"),
+      Seq(Row(2)))
   }
 
   test("test bad record is redirect with location in option while data loading should pass") {
     sql("drop table if exists sales")
-         sql(
-        """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales(ID BigInt, date Timestamp, country String,
           actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata'""")
-      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
-          "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
-          " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + {badRecordFilePath.getCanonicalPath} +
-          "')")
-      checkAnswer(sql("select count(*) from sales"),
-        Seq(Row(2)))
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales OPTIONS" +
+        "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } +
+        "','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales"),
+      Seq(Row(2)))
   }
 
-  override def afterAll() = {
-    sql("drop table if exists sales")
+  test("test bad record FORCE option with no_sort as sort scope ") {
+    sql("drop table if exists sales_no_sort")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+        "('bad_records_action'='FORCE', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales_no_sort"),
+      Seq(Row(6)))
+  }
+
+  test("test bad record REDIRECT option with location and no_sort as sort scope ") {
+    sql("drop table if exists sales_no_sort")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+        "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } +
+        "','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales_no_sort"),
+      Seq(Row(2)))
+  }
+
+  test("test bad record IGNORE option with no_sort as sort scope ") {
+    sql("drop table if exists sales_no_sort")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+        "('bad_records_action'='IGNORE', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales_no_sort"),
+      Seq(Row(2)))
   }
 
-}
+  test("test bad record with FAIL option with location and no_sort as sort scope ") {
+    sql("drop table if exists sales_no_sort")
+    sql(
+      """CREATE TABLE IF NOT EXISTS sales_no_sort(ID BigInt, date Timestamp, country String,
+          actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' tblproperties('sort_scope'='NO_SORT')""")
+
+    val exception = intercept[Exception] {
+      sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_no_sort OPTIONS" +
+          "('bad_records_action'='FAIL', 'DELIMITER'=" +
+          " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+    }
+    assert(exception.getMessage
+      .contains(
+        "Data load failed due to bad record: The value with column name date and column data " +
+        "type TIMESTAMP is not a valid TIMESTAMP type.Please enable bad record logger to know" +
+        " the detail reason"))
+  }
+
+  test("test bad record with IGNORE option and sort scope as NO_SORT for bucketed table") {
+    sql("drop table if exists sales_bucket")
+    sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," +
+          "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country','sort_scope'='NO_SORT')")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" +
+        "('bad_records_action'='IGNORE', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales_bucket"),
+      Seq(Row(2)))
+  }
+
+  test("test bad record with REDIRECT option and sort scope as NO_SORT for bucketed table") {
+    sql("drop table if exists sales_bucket")
+    sql("CREATE TABLE IF NOT EXISTS sales_bucket(ID BigInt, date Timestamp, country String," +
+        "actual_price Double, Quantity int, sold_price Decimal(19,2)) STORED BY 'carbondata' TBLPROPERTIES ('BUCKETNUMBER'='2', 'BUCKETCOLUMNS'='country', 'sort_scope'='NO_SORT')")
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO TABLE sales_bucket OPTIONS" +
+        "('bad_records_action'='REDIRECT', 'DELIMITER'=" +
+        " ',', 'QUOTECHAR'= '\"', 'BAD_RECORD_PATH'='" + { badRecordFilePath.getCanonicalPath } +
+        "','timestampformat'='yyyy/MM/dd')")
+    checkAnswer(sql("select count(*) from sales_bucket"),
+      Seq(Row(2)))
+  }
+
+
+  private def currentPath: String = {
+    new File(this.getClass.getResource("/").getPath + "../../")
+      .getCanonicalPath
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ebfab15/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
index e819dcd..6b75aa3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/CarbonRowBatch.java
@@ -22,6 +22,7 @@ import java.util.NoSuchElementException;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 
+import org.apache.commons.lang.ArrayUtils;
 
 /**
  * Batch of rows.
@@ -59,7 +60,9 @@ public class CarbonRowBatch extends CarbonIterator<CarbonRow> {
   }
 
   @Override public void remove() {
-
+    rowBatch = (CarbonRow[]) ArrayUtils.remove(rowBatch, index - 1);
+    --size;
+    --index;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2ebfab15/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index 72a8c25..43b2278 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -192,11 +192,15 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
   protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
     while (rowBatch.hasNext()) {
       CarbonRow convertRow = localConverter.convert(rowBatch.next());
-      if (isSortColumnRangeEnabled || isBucketColumnEnabled) {
-        short rangeNumber = (short) partitioner.getPartition(convertRow);
-        convertRow.setRangeId(rangeNumber);
+      if (convertRow == null) {
+        rowBatch.remove();
+      } else {
+        if (isSortColumnRangeEnabled || isBucketColumnEnabled) {
+          short rangeNumber = (short) partitioner.getPartition(convertRow);
+          convertRow.setRangeId(rangeNumber);
+        }
+        rowBatch.setPreviousRow(convertRow);
       }
-      rowBatch.setPreviousRow(convertRow);
     }
     rowCounter.getAndAdd(rowBatch.getSize());
     // reuse the origin batch