You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/09/20 04:45:44 UTC

[carbondata] branch master updated: [CARBONDATA-4285] Fix alter add complex columns with global sort compaction failure

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 22342f8  [CARBONDATA-4285] Fix alter add complex columns with global sort compaction failure
22342f8 is described below

commit 22342f847d7db515e5f8c17525522085f49bd2a5
Author: Mahesh Raju Somalaraju <ma...@huawei.com>
AuthorDate: Thu Sep 16 22:41:54 2021 +0530

    [CARBONDATA-4285] Fix alter add complex columns with global sort compaction failure
    
    Why is this PR needed?
    Alter add complex columns with global sort compaction is failing due to
    
    AOI exception : Currently creating default complex delimiter list in global sort compaction
    with size of 3. For map case need extra complex delimiter for handling the key-value
    bad record handling: When we add complex columns after insert the data, complex columns
    has null data for previously loaded segments. this null value is going to treat as bad
    record and compaction is failed.
    
    What changes were proposed in this PR?
    In Global sort compaction flow create default complex delimiter with 4, as already
    doing in load flow.
    Bad records handling pruned for compaction case. No need to check bad records for
    compaction as they are already checked while loading. previously loaded segments data
    we are inserting again in compaction case
    
    This closes #4218
---
 .../hadoop/api/CarbonTableOutputFormat.java        |  9 ++-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 12 +++-
 .../spark/load/DataLoadProcessorStepOnSpark.scala  |  5 +-
 .../spark/rdd/CarbonTableCompactor.scala           |  3 +-
 .../alterTable/TestAlterTableAddColumns.scala      | 80 ++++++++++++++++++++++
 .../processing/loading/BadRecordsLogger.java       |  9 ++-
 .../loading/BadRecordsLoggerProvider.java          | 12 +++-
 .../loading/converter/impl/RowConverterImpl.java   | 14 ++--
 8 files changed, 128 insertions(+), 16 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index ed447a5..12f68d8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -365,11 +365,16 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     if (null == complexDelim) {
       complexDelim = ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_1.value() + ","
           + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_2.value() + ","
-          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value();
+          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_3.value() + ","
+          + ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value();
     }
     String[] split = complexDelim.split(",");
     model.setComplexDelimiter(split[0]);
-    if (split.length > 2) {
+    if (split.length > 3) {
+      model.setComplexDelimiter(split[1]);
+      model.setComplexDelimiter(split[2]);
+      model.setComplexDelimiter(split[3]);
+    } else if (split.length > 2) {
       model.setComplexDelimiter(split[1]);
       model.setComplexDelimiter(split[2]);
     } else if (split.length > 1) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 83d1890..fe1de5c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -73,7 +73,8 @@ object DataLoadProcessBuilderOnSpark {
       dataFrame: Option[DataFrame],
       model: CarbonLoadModel,
       hadoopConf: Configuration,
-      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
+      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]],
+      isCompactionFlow: Boolean = false)
   : Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     var isLoadFromCSV = false
     val originRDD = if (dataFrame.isDefined) {
@@ -121,8 +122,13 @@ object DataLoadProcessBuilderOnSpark {
     // 2. Convert
     val convertRDD = inputRDD.mapPartitionsWithIndex { case (index, rows) =>
       ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
-      DataLoadProcessorStepOnSpark.convertFunc(rows, index, modelBroadcast, partialSuccessAccum,
-        convertStepRowCounter)
+      DataLoadProcessorStepOnSpark.convertFunc(rows,
+        index,
+        modelBroadcast,
+        partialSuccessAccum,
+        convertStepRowCounter,
+        false,
+        isCompactionFlow)
     }.filter(_ != null) // Filter the bad record
 
     // 3. Sort
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 1694579..8f59200 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -221,10 +221,11 @@ object DataLoadProcessorStepOnSpark {
       modelBroadcast: Broadcast[CarbonLoadModel],
       partialSuccessAccum: LongAccumulator,
       rowCounter: LongAccumulator,
-      keepActualData: Boolean = false): Iterator[CarbonRow] = {
+      keepActualData: Boolean = false,
+      isCompactionFlow: Boolean = false): Iterator[CarbonRow] = {
     val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
-    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf)
+    val badRecordLogger = BadRecordsLoggerProvider.createBadRecordLogger(conf, isCompactionFlow)
     if (keepActualData) {
       conf.getDataFields.foreach(_.setUseActualData(keepActualData))
     }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index bf3eed3..d39143a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -507,7 +507,8 @@ class CarbonTableCompactor(
         Option(dataFrame),
         outputModel,
         SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
-        segmentMetaDataAccumulator)
+        segmentMetaDataAccumulator,
+        isCompactionFlow = true)
         .map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
         }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
index bd4a112..c74dd13 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableAddColumns.scala
@@ -594,4 +594,84 @@ class TestAlterTableAddColumns extends QueryTest with BeforeAndAfterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, "true")
   }
+
+  test("test the complex columns with global sort compaction") {
+    sql("DROP TABLE IF EXISTS alter_global1")
+    sql("CREATE TABLE alter_global1(intField INT) STORED AS carbondata " +
+        "TBLPROPERTIES('sort_columns'='intField','sort_scope'='global_sort')")
+    sql("insert into alter_global1 values(1)")
+    sql("insert into alter_global1 values(2)")
+    sql("insert into alter_global1 values(3)")
+    sql( "ALTER TABLE alter_global1 ADD COLUMNS(str1 array<int>)")
+    sql("insert into alter_global1 values(4, array(1))")
+    sql("insert into alter_global1 values(5, null)")
+    sql( "ALTER TABLE alter_global1 ADD COLUMNS(str2 array<string>)")
+    sql("insert into alter_global1 values(6, array(1), array('', 'hi'))")
+    sql("insert into alter_global1 values(7, array(1), array('bye', 'hi'))")
+    sql("ALTER TABLE alter_global1 ADD COLUMNS(str3 array<date>, str4 struct<s1:timestamp>)")
+    sql(
+      "insert into alter_global1 values(8, array(1), array('bye', 'hi'), array('2017-02-01'," +
+      "'2018-09-11'),named_struct('s1', '2017-02-01 00:01:00'))")
+    val expected = Seq(Row(1, null, null, null, null),
+      Row(2, null, null, null, null),
+      Row(3, null, null, null, null),
+      Row(4, make(Array(1)), null, null, null),
+      Row(5, null, null, null, null),
+      Row(6, make(Array(1)), make(Array("", "hi")), null, null),
+      Row(7, make(Array(1)), make(Array("bye", "hi")), null, null),
+      Row(8, make(Array(1)), make(Array("bye", "hi")),
+        make(Array(Date.valueOf("2017-02-01"), Date.valueOf("2018-09-11"))),
+        Row(Timestamp.valueOf("2017-02-01 00:01:00"))))
+    checkAnswer(sql("select * from alter_global1"), expected)
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_global1")
+    assert(addedColumns.size == 4)
+    sql("alter table alter_global1 compact 'minor'")
+    checkAnswer(sql("select * from alter_global1"), expected)
+    sql("DROP TABLE IF EXISTS alter_global1")
+  }
+
+  test("test the multi-level complex columns with global sort compaction") {
+    sql("DROP TABLE IF EXISTS alter_global2")
+    sql("CREATE TABLE alter_global2(intField INT) STORED AS carbondata " +
+        "TBLPROPERTIES('sort_columns'='intField','sort_scope'='global_sort')")
+    sql("insert into alter_global2 values(1)")
+    // multi-level nested array
+    sql(
+      "ALTER TABLE alter_global2 ADD COLUMNS(arr1 array<array<int>>, arr2 array<struct<a1:string," +
+      "map1:Map<string, string>>>) ")
+    sql(
+      "insert into alter_global2 values(1, array(array(1,2)), array(named_struct('a1','st'," +
+      "'map1', map('a','b'))))")
+    // multi-level nested struct
+    sql("ALTER TABLE alter_global2 ADD COLUMNS(struct1 struct<s1:string, arr: array<int>>," +
+        " struct2 struct<num:double,contact:map<string,array<int>>>) ")
+    sql("insert into alter_global2 values(1, " +
+        "array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " +
+        "named_struct('s1','hi','arr',array(1,2)), named_struct('num',2.3,'contact',map('ph'," +
+        "array(1,2))))")
+    // multi-level nested map
+    sql(
+      "ALTER TABLE alter_global2 ADD COLUMNS(map1 map<string,array<string>>, map2 map<string," +
+      "struct<d:int, s:struct<im:string>>>)")
+    sql("insert into alter_global2 values(1,  " +
+    "array(array(1,2)), array(named_struct('a1','st','map1', map('a','b'))), " +
+    "named_struct('s1','hi','arr',array(1,2)), named_struct('num',2.3,'contact',map('ph'," +
+    "array(1,2))),map('a',array('hi')), map('a',named_struct('d',23,'s',named_struct('im'," +
+    "'sh'))))")
+    val expected = Seq(Row(1, null, null, null, null, null, null),
+      Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
+        null, null, null, null),
+      Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
+        Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))), null, null),
+      Row(1, make(Array(make(Array(1, 2)))), make(Array(Row("st", Map("a" -> "b")))),
+        Row("hi", make(Array(1, 2))), Row(2.3, Map("ph" -> make(Array(1, 2)))),
+        Map("a" -> make(Array("hi"))), Map("a" -> Row(23, Row("sh"))))
+    )
+    checkAnswer(sql("select * from alter_global2"), expected)
+    val addedColumns = addedColumnsInSchemaEvolutionEntry("alter_global2")
+    assert(addedColumns.size == 6)
+    sql("alter table alter_global2 compact 'minor'")
+    checkAnswer(sql("select * from alter_global2"), expected)
+    sql("DROP TABLE IF EXISTS alter_global2")
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
index 6a939bf..1e54e19 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -88,11 +88,13 @@ public class BadRecordsLogger {
 
   private boolean isDataLoadFail;
 
+  private boolean isCompactionFlow;
+
   // private final Object syncObject =new Object();
 
   public BadRecordsLogger(String key, String fileName, String storePath,
       boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
-      boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
+      boolean badRecordConvertNullDisable, boolean isDataLoadFail, boolean isCompactionFlow) {
     // Initially no bad rec
     taskKey = key;
     this.fileName = fileName;
@@ -101,6 +103,11 @@ public class BadRecordsLogger {
     this.badRecordLoggerEnable = badRecordLoggerEnable;
     this.badRecordConvertNullDisable = badRecordConvertNullDisable;
     this.isDataLoadFail = isDataLoadFail;
+    this.isCompactionFlow = isCompactionFlow;
+  }
+
+  public boolean isCompFlow() {
+    return isCompactionFlow;
   }
 
   /**
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
index 25ae1c1..dbc4ff9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLoggerProvider.java
@@ -33,6 +33,16 @@ public class BadRecordsLoggerProvider {
    * @return
    */
   public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) {
+    return createBadRecordLogger(configuration, false);
+  }
+
+  /**
+   * method returns the BadRecordsLogger instance
+   * @param configuration
+   * @return
+   */
+  public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration,
+      Boolean isCompactionFlow) {
     boolean badRecordsLogRedirect = false;
     boolean badRecordConvertNullDisable = false;
     boolean isDataLoadFail = false;
@@ -72,7 +82,7 @@ public class BadRecordsLoggerProvider {
     return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
         identifier.getTableName() + '_' + System.currentTimeMillis(),
         getBadLogStoreLocation(configuration), badRecordsLogRedirect,
-        badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+        badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail, isCompactionFlow);
   }
 
   public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration) {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
index 28bfa91..b5dcb9e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -134,13 +134,15 @@ public class RowConverterImpl implements RowConverter {
         if (reason.equalsIgnoreCase(CarbonCommonConstants.STRING_LENGTH_EXCEEDED_MESSAGE)) {
           reason = String.format(reason, this.fields[i].getColumn().getColName());
         }
-        badRecordLogger.addBadRecordsToBuilder(row.getRawData(), reason);
-        if (badRecordLogger.isDataLoadFail()) {
-          String error = "Data load failed due to bad record: " + reason;
-          if (!badRecordLogger.isBadRecordLoggerEnable()) {
-            error += "Please enable bad record logger to know the detail reason.";
+        if (!badRecordLogger.isCompFlow()) {
+          badRecordLogger.addBadRecordsToBuilder(row.getRawData(), reason);
+          if (badRecordLogger.isDataLoadFail()) {
+            String error = "Data load failed due to bad record: " + reason;
+            if (!badRecordLogger.isBadRecordLoggerEnable()) {
+              error += "Please enable bad record logger to know the detail reason.";
+            }
+            throw new BadRecordFoundException(error);
           }
-          throw new BadRecordFoundException(error);
         }
         logHolder.clear();
         logHolder.setLogged(true);