You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/09/15 12:09:53 UTC

[carbondata] branch master updated: [CARBONDATA-3980] Load fails with aborted exception when Bad records action is unspecified

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b121ac6  [CARBONDATA-3980] Load fails with aborted exception when Bad records action is unspecified
b121ac6 is described below

commit b121ac6c18d03d30a22974bad7504870dd80b41a
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Thu Sep 10 16:30:05 2020 +0530

    [CARBONDATA-3980] Load fails with aborted exception when Bad records action is unspecified
    
    Why is this PR needed?
    Load fails with aborted exception when Bad records action is unspecified.
    
    When the partition column is loaded with a bad record value, load fails with 'Job aborted' message in cluster. However in complete stack trace we can see the actual error message. (Like, 'Data load failed due to bad record: The value with column name projectjoindate and column data type TIMESTAMP is not a valid TIMESTAMP type')
    
    What changes were proposed in this PR?
    Fix bad record error message for the partition column. Added the error message to operationContext map and if its not null throwing exception with errorMessage from CarbonLoadDataCommand.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3919
---
 .../command/management/CarbonLoadDataCommand.scala    |  7 ++++++-
 .../command/management/CommonLoadUtils.scala          |  1 +
 .../StandardPartitionBadRecordLoggerTest.scala        | 19 +++++++++++++++++++
 3 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index b17969b..d5c3c84 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -191,7 +191,12 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String],
         if (isUpdateTableStatusRequired) {
           CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
         }
-        throw ex
+        val errorMessage = operationContext.getProperty("Error message")
+        if (errorMessage != null) {
+          throw new RuntimeException(errorMessage.toString, ex.getCause)
+        } else {
+          throw ex
+        }
     }
     Seq.empty
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index f574e12..5c46127 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -1064,6 +1064,7 @@ object CommonLoadUtils {
         if (loadParams.updateModel.isDefined) {
           CarbonScalaUtil.updateErrorInUpdateModel(loadParams.updateModel.get, executorMessage)
         }
+        loadParams.operationContext.setProperty("Error message", errorMessage)
         LOGGER.info(errorMessage)
         LOGGER.error(ex)
         throw ex
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
index c19c51e..488291d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionBadRecordLoggerTest.scala
@@ -219,6 +219,25 @@ class StandardPartitionBadRecordLoggerTest extends QueryTest with BeforeAndAfter
     }
   }
 
+  test("test load with partition column having bad record value") {
+    sql("drop table if exists dataloadOptionTests")
+    sql("CREATE TABLE dataloadOptionTests (empno int, empname String, designation String, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, projectjoindate " +
+      "Timestamp, projectenddate Date,attendance int,utilization int,salary int) PARTITIONED BY " +
+      "(deptname String,doj Timestamp,projectcode int) STORED AS carbondata ")
+    val csvFilePath = s"$resourcesPath/data.csv"
+    val ex = intercept[RuntimeException] {
+      sql("LOAD DATA local inpath '" + csvFilePath +
+          "' INTO TABLE dataloadOptionTests OPTIONS ('bad_records_action'='FAIL', 'DELIMITER'= '," +
+          "', 'QUOTECHAR'= '\"', 'dateformat'='DD-MM-YYYY','timestampformat'='DD-MM-YYYY')")
+    }
+    assert(ex.getMessage.contains(
+      "DataLoad failure: Data load failed due to bad record: The value with column name " +
+      "projectjoindate and column data type TIMESTAMP is not a valid TIMESTAMP type.Please " +
+      "enable bad record logger to know the detail reason."))
+    sql("drop table dataloadOptionTests")
+  }
+
   def drop(): Unit = {
     sql("drop table IF EXISTS sales")
     sql("drop table IF EXISTS serializable_values")