You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/08/01 09:18:52 UTC

[carbondata] branch master updated: [CARBONDATA-3478]Fix ArrayIndexOutOfBound Exception on compaction after alter operation

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c8cc92b  [CARBONDATA-3478]Fix ArrayIndexOutOfBound Exception on compaction after alter operation
c8cc92b is described below

commit c8cc92b6db80e7b6d220901e65ddf1f5beb557fb
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Fri Jul 26 16:51:32 2019 +0530

    [CARBONDATA-3478]Fix ArrayIndexOutOfBound Exception on compaction after alter operation
    
    Problem:
    In case of alter add, drop, rename operation, restructuredBlockExists will be true.
    Currently, to get RawResultIterator for a block, we check if block has ColumnDrift
    or not, by comparing SegmentProperties and columndrift columns.
    SegmentProperties will be formed based on restructuredBlockExists.
    if restructuredBlockExists is true, we will take current column schema to form SegmentProperties,
    else, we will use datafilefooter columnschema to form SegmentProperties.
    
    In the example given in CARBONDATA-3478 for both blocks, we use current column
    schema to form SegmentProperties, as restructuredBlockExists will be true.
    Hence, while iterating block 1, it throws ArrayIndexOutOfBound exception,
    as it uses RawResultIterator instead of ColumnDriftRawResultIterator
    
    Solution:
    Use schema from datafilefooter of each block to check if it has columndrift or not
    
    This closes #3337
---
 .../AlterTableColumnRenameTestCase.scala           | 54 ++++++++++++++++++++++
 .../merger/CarbonCompactionExecutor.java           |  9 +++-
 2 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
index d927724..dd1fa0f 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AlterTableColumnRenameTestCase.scala
@@ -320,12 +320,66 @@ class AlterTableColumnRenameTestCase extends Spark2QueryTest with BeforeAndAfter
     }
   }
 
+  test("test compaction after table rename and alter set tblproerties") {
+    sql("DROP TABLE IF EXISTS test_rename")
+    sql("DROP TABLE IF EXISTS test_rename_compact")
+    sql(
+      "CREATE TABLE test_rename (empno int, empname String, designation String, doj Timestamp, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE test_rename OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    sql("alter table test_rename rename to test_rename_compact")
+    sql("alter table test_rename_compact set tblproperties('sort_columns'='deptno,projectcode', 'sort_scope'='local_sort')")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE test_rename_compact OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    val res1 = sql("select * from test_rename_compact")
+    sql("alter table test_rename_compact compact 'major'")
+    val res2 = sql("select * from test_rename_compact")
+    assert(res1.collectAsList().containsAll(res2.collectAsList()))
+    checkExistence(sql("show segments for table test_rename_compact"), true, "Compacted")
+    sql("DROP TABLE IF EXISTS test_rename")
+    sql("DROP TABLE IF EXISTS test_rename_compact")
+  }
+
+  test("test compaction after alter set tblproerties- add and drop") {
+    sql("DROP TABLE IF EXISTS test_alter")
+    sql(
+      "CREATE TABLE test_alter (empno int, empname String, designation String, doj Timestamp, " +
+      "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+      "projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int," +
+      "utilization int,salary int) STORED BY 'org.apache.carbondata.format'")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE test_alter OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    sql("alter table test_alter set tblproperties('sort_columns'='deptno,projectcode', 'sort_scope'='local_sort')")
+    sql("alter table test_alter drop columns(deptno)")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE test_alter OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    sql("alter table test_alter add columns(deptno int)")
+    sql(
+      s"""LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO TABLE test_alter OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '\"')""".stripMargin)
+    val res1 = sql("select * from test_alter")
+    sql("alter table test_alter compact 'major'")
+    val res2 = sql("select * from test_alter")
+    assert(res1.collectAsList().containsAll(res2.collectAsList()))
+    sql("DROP TABLE IF EXISTS test_alter")
+  }
+
   override def afterAll(): Unit = {
     dropTable()
   }
 
   def dropTable(): Unit = {
     sql("DROP TABLE IF EXISTS RENAME")
+    sql("DROP TABLE IF EXISTS test_rename")
+    sql("DROP TABLE IF EXISTS test_rename_compact")
+    sql("DROP TABLE IF EXISTS test_alter")
   }
 
   def createTableAndLoad(): Unit = {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
index 28f1cf4..d7769bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -177,8 +177,9 @@ public class CarbonCompactionExecutor {
   private RawResultIterator getRawResultIterator(Configuration configuration, String segmentId,
       String task, List<TableBlockInfo> tableBlockInfoList)
       throws QueryExecutionException, IOException {
-    SegmentProperties sourceSegmentProperties = getSourceSegmentProperties(
-        Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter()));
+    SegmentProperties sourceSegmentProperties =
+        new SegmentProperties(tableBlockInfoList.get(0).getDataFileFooter().getColumnInTable(),
+            tableBlockInfoList.get(0).getDataFileFooter().getSegmentInfo().getColumnCardinality());
     boolean hasColumnDrift = carbonTable.hasColumnDrift() &&
         RestructureUtil.hasColumnDriftOnSegment(carbonTable, sourceSegmentProperties);
     if (hasColumnDrift) {
@@ -186,6 +187,10 @@ public class CarbonCompactionExecutor {
           executeBlockList(tableBlockInfoList, segmentId, task, configuration),
           sourceSegmentProperties, destinationSegProperties);
     } else {
+      if (restructuredBlockExists) {
+        sourceSegmentProperties = getSourceSegmentProperties(
+            Collections.singletonList(tableBlockInfoList.get(0).getDataFileFooter()));
+      }
       return new RawResultIterator(
           executeBlockList(tableBlockInfoList, segmentId, task, configuration),
           sourceSegmentProperties, destinationSegProperties, true);