You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/04/27 15:41:35 UTC

[carbondata] branch master updated: [CARBONDATA-4170] Support dropping of parent complex columns(array/struct/map)

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f93479  [CARBONDATA-4170] Support dropping of parent complex columns(array/struct/map)
2f93479 is described below

commit 2f9347961d9135af4a3ffe37d6bdcc1c7260ebc4
Author: akkio-97 <ak...@gmail.com>
AuthorDate: Wed Mar 31 20:41:26 2021 +0530

    [CARBONDATA-4170] Support dropping of parent complex columns(array/struct/map)
    
    Why is this PR needed?
    This PR supports dropping of parent complex columns (single and multi-level)
    from the carbon table. Dropping of parent column will in turn drop all of
    its children columns too.
    
    What changes were proposed in this PR?
    Children columns are prefixed with its parent column name. So the identified
    columns are added to the delete-column-list and the schema is updated based
    on that.Test cases have been written up to 3-levels.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4121
---
 docs/ddl-of-carbondata.md                          |   6 +-
 .../secondaryindex/TestSIWithSecondaryIndex.scala  |  44 +++---
 .../schema/CarbonAlterTableDropColumnCommand.scala |  29 ++--
 .../events/AlterTableDropColumnEventListener.scala |  25 +---
 .../vectorreader/DropColumnTestCases.scala         | 164 ++++++++++++++++++++-
 5 files changed, 211 insertions(+), 57 deletions(-)

diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 4c9e957..50ddf51 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -799,7 +799,11 @@ Users can specify which columns to include and exclude for local dictionary gene
      ALTER TABLE carbon DROP COLUMNS (c1,d1)
      ```
 
-     **NOTE:** Drop Complex child column is not supported.
+     **NOTE:** 
+     1. Drop Complex child column is not supported.
+     
+     2. If a column to be dropped has any Secondary index table created on them, drop column operation fails and the user will 
+     be asked to drop the corresponding SI table first before going for actual drop.
 
    - #### CHANGE COLUMN NAME/TYPE/COMMENT
    
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
index a5c1d34..cecab4c 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondaryIndex.scala
@@ -62,27 +62,33 @@ class TestSIWithSecondaryIndex extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test ("test alter drop all columns of the SI table") {
-    sql("create table table_drop_columns (" +
-        "name string, id string, country string) stored as carbondata")
-    sql("insert into table_drop_columns select 'xx', '1', 'china'")
-    sql("create index tdc_index_1 on table table_drop_columns(id, country) as 'carbondata'")
-    // alter table to drop all the columns used in index
-    sql("alter table table_drop_columns drop columns(id, country)")
-    sql("insert into table_drop_columns select 'xy'")
-    assert(sql("show indexes on table_drop_columns").collect().isEmpty)
-  }
-
-  test ("test alter drop few columns of the SI table") {
+  test("test alter drop columns of the SI table") {
     sql("create table table_drop_columns_fail (" +
         "name string, id string, country string) stored as carbondata")
-    sql("insert into table_drop_columns_fail select 'xx', '1', 'china'")
-    sql("create index tdcf_index_1 on table table_drop_columns_fail(id, country) as 'carbondata'")
-    // alter table to drop few columns used in index. This should fail as we are not dropping all
-    // the index columns
-    assert(intercept[ProcessMetaDataException](sql(
-      "alter table table_drop_columns_fail drop columns(id)")).getMessage
-      .contains("Alter table drop column operation failed:"))
+    sql("ALTER TABLE table_drop_columns_fail ADD COLUMNS(arr1 array<string>)")
+    sql("insert into table_drop_columns_fail values( 'xx', '1', 'china', array('hello', 'world') )")
+    sql("drop index if exists tdcf_index_1 on table_drop_columns_fail")
+    sql("create index tdcf_index_1 on table table_drop_columns_fail(arr1,country) as 'carbondata'")
+    sql("drop index if exists tdcf_index_2 on table_drop_columns_fail")
+    sql("create index tdcf_index_2 on table table_drop_columns_fail(id,name) as 'carbondata'")
+    val exception1 = intercept[Exception] {
+      sql("ALTER TABLE table_drop_columns_fail DROP COLUMNS(arr1) ")
+    }
+    val exceptionMessage1 =
+      "operation failed for default.table_drop_columns_fail: Alter table drop column operation " +
+      "failed: The provided column(s) are present in index table. Please drop the index table " +
+      "[tdcf_index_1] first and then retry the drop column operation"
+    assert(exception1.getMessage.contains(exceptionMessage1))
+    val exception2 = intercept[Exception] {
+      sql("ALTER TABLE table_drop_columns_fail DROP COLUMNS(id,name) ")
+    }
+    val exceptionMessage2 =
+      "operation failed for default.table_drop_columns_fail: Alter table drop column operation " +
+      "failed: The provided column(s) are present in index table. Please drop the index table " +
+      "[tdcf_index_2] first and then retry the drop column operation"
+    assert(exception2.getMessage.contains(exceptionMessage2))
+    val columns = sql("desc table table_drop_columns_fail").collect()
+    assert(columns.size == 4)
   }
 
   test("test create secondary index global sort after insert") {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 865b1ea..c463091 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -120,11 +121,6 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
                 dictionaryColumns ++= Seq(tableColumn.getColumnSchema)
               }
             }
-            // Check if column to be dropped is of complex dataType
-            if (tableColumn.getDataType.isComplexType) {
-              val errMsg = "Complex column cannot be dropped"
-              throw new MalformedCarbonCommandException(errMsg)
-            }
             columnExist = true
           }
         }
@@ -145,21 +141,34 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       // read the latest schema file
       val tableInfo: org.apache.carbondata.format.TableInfo =
         metastore.getThriftTableInfo(carbonTable)
-      // maintain the deleted columns for schema evolution history
+      // deletedColumnSchema contains parent column schema and child also in case of complex and
+      // deletedTableColumns contains only parent columns in case of complex to add in
+      // schemaEvolution entry
       var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
+      var deletedTableColumns = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala
       alterTableDropColumnModel.columns.foreach { column =>
         columnSchemaList.foreach { columnSchema =>
-          if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) {
-            deletedColumnSchema += columnSchema.deepCopy
-            columnSchema.invisible = true
+          if (!columnSchema.invisible) {
+            if (column.equalsIgnoreCase(columnSchema.column_name)) {
+              val columnSchemaCopy = columnSchema.deepCopy
+              deletedTableColumns += columnSchemaCopy
+              deletedColumnSchema += columnSchemaCopy
+              columnSchema.invisible = true
+            } else if (columnSchema.column_name.toLowerCase
+              .startsWith(column + CarbonCommonConstants.POINT)) {
+              // if the column to be dropped is of complex type then its children are prefixed
+              // with -> parent_name + '.'
+              deletedColumnSchema += columnSchema.deepCopy
+              columnSchema.invisible = true
+            }
           }
         }
       }
       // add deleted columns to schema evolution history and update the schema
       timeStamp = System.currentTimeMillis
       val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
-      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
+      schemaEvolutionEntry.setRemoved(deletedTableColumns.toList.asJava)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl
       val delCols = deletedColumnSchema.map { deleteCols =>
         schemaConverter.fromExternalToWrapperColumnSchema(deleteCols)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala
index 08f421d..0559aa4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala
@@ -48,19 +48,18 @@ class AlterTableDropColumnEventListener extends OperationEventListener {
         val tablePath = carbonTable.getTablePath
         val sparkSession = alterTableDropColumnPreEvent.sparkSession
         val alterTableDropColumnModel = alterTableDropColumnPreEvent.alterTableDropColumnModel
-        dropApplicableSITables(dbName,
+        checkIfDropColumnExistsInSI(dbName,
           tableName,
           tablePath,
           alterTableDropColumnModel)(sparkSession)
     }
   }
 
-  private def dropApplicableSITables(dbName: String,
+  private def checkIfDropColumnExistsInSI(dbName: String,
       tableName: String,
       tablePath: String,
       alterTableDropColumnModel: AlterTableDropColumnModel)
     (sparkSession: SparkSession) {
-    var indexTableToDrop: Seq[String] = Seq.empty
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     val parentCarbonTable = catalog.lookupRelation(Some(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation].carbonTable
@@ -74,23 +73,11 @@ class AlterTableDropColumnEventListener extends OperationEventListener {
       val indexColumns = indexTable._2.asScala(CarbonCommonConstants.INDEX_COLUMNS).split(",")
       val colSize = alterTableDropColumnModel.columns.intersect(indexColumns).size
       if (colSize > 0) {
-        if (colSize == indexColumns.size) {
-          indexTableToDrop ++= Seq(indexTable._1)
-        } else {
-          sys
-            .error(s"Index Table [${
-              indexTable._1
-            }] exists with combination of provided drop column(s) and other columns, drop " +
-                   s"index table & retry")
-        }
+        sys
+          .error(s"The provided column(s) are present in index table. Please drop the " +
+                 s"index table [${ indexTable._1 }] first and then retry the drop column operation")
+
       }
     })
-    indexTableToDrop.foreach { indexTable =>
-      DropIndexCommand(ifExistsSet = true,
-        Some(dbName),
-        tableName,
-        indexTable.toLowerCase,
-        needLock = false).run(sparkSession)
-    }
   }
 }
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
index b16c676..55f4ffd 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
@@ -17,11 +17,16 @@
 
 package org.apache.spark.carbondata.restructure.vectorreader
 
-import org.apache.spark.sql.Row
+import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConverters
+
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class DropColumnTestCases extends QueryTest with BeforeAndAfterAll {
@@ -97,13 +102,156 @@ class DropColumnTestCases extends QueryTest with BeforeAndAfterAll {
     test_drop_and_compaction()
   }
 
-  test("test dropping of complex column should throw exception") {
-    sql("drop table if exists maintbl")
-    sql("create table maintbl (a string, b string, c struct<si:int>) STORED AS carbondata")
-    assert(intercept[ProcessMetaDataException] {
-      sql("alter table maintbl drop columns(b,c)").collect()
-    }.getMessage.contains("Complex column cannot be dropped"))
-    sql("drop table if exists maintbl")
+  def checkSchemaSize(value: Integer): Unit = {
+    val schema = sql("describe alter_com").collect()
+    assert(schema.size.equals(value))
+  }
+
+  def checkDroppedColumnsInSchemaEvolutionEntry(tableName: String, value: Integer): Unit = {
+    val carbonTable = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
+    val schemaEvolutionList = carbonTable.getTableInfo
+      .getFactTable
+      .getSchemaEvolution()
+      .getSchemaEvolutionEntryList()
+    var droppedColumns = Seq[ColumnSchema]()
+    for (i <- 0 until schemaEvolutionList.size()) {
+      droppedColumns ++=
+      JavaConverters
+        .asScalaIteratorConverter(schemaEvolutionList.get(i).getRemoved.iterator())
+        .asScala
+        .toSeq
+    }
+    assert(droppedColumns.size.equals(value))
+  }
+
+  test("test dropping of array of all primitive types") {
+    import scala.collection.mutable.WrappedArray.make
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql("CREATE TABLE alter_com(intfield int, arr array<int>, arr1 array<short>, " +
+        "arr2 array<int>, arr3 array<long>, arr4 array<double>, arr5 array<decimal(8,2)>, " +
+        "arr6 array<string>, arr7 array<char(5)>, arr8 array<varchar(50)>, arr9 array<boolean>, " +
+        "arr10 array<date>, arr11 array<timestamp>) STORED AS carbondata")
+    sql("insert into alter_com values(1,array(1,5),array(1,5),array(1,2),array(1,2,3)," +
+        "array(1.2d,2.3d),array(4.5,6.7),array('hello','world'),array('a','bcd')," +
+        "array('abcd','efg'),array(true,false),array('2017-02-01','2018-09-11')," +
+        "array('2017-02-01 00:01:00','2018-02-01 02:21:00') )")
+    sql("ALTER TABLE alter_com DROP COLUMNS(arr1,arr2,arr3,arr4,arr5,arr6) ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(arr7,arr8,arr9) ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(arr10,arr11) ")
+    val exception = intercept[Exception] {
+      sql("ALTER TABLE alter_com DROP COLUMNS(arr10,arr10) ")
+    }
+    val exceptionMessage =
+      "arr10 is duplicate. Duplicate columns not allowed"
+    assert(exception.getMessage.contains(exceptionMessage))
+
+    checkSchemaSize(2)
+    checkAnswer(sql("select * from alter_com"), Seq(Row(1, make(Array(1, 5)))))
+    checkDroppedColumnsInSchemaEvolutionEntry("alter_com", 11)
+    // check adding columns with same names again
+    sql(
+      "ALTER TABLE alter_com ADD COLUMNS(arr1 array<short>, arr2 array<int>, arr3 " +
+      "array<long>, arr4 array<double>, arr5 array<decimal(8,2)>, arr6 array<string>, arr7 " +
+      "array<char(5)>, arr8 array<varchar(50)>, arr9 array<boolean>, arr10 array<date>, arr11 " +
+      "array<timestamp> )")
+    val columns = sql("desc table alter_com").collect()
+    assert(columns.size == 13)
+    sql(
+      "insert into alter_com values(2,array(2,5),array(2,5),array(2,2),array(2,2,3),array(2.2d," +
+      "2.3d),array(2.5,6.7),array('hello2','world'),array('a2','bcd'),array('abcd2','efg'),array" +
+      "(true,false), array('2017-02-01','2018-09-11'),array('2017-02-01 00:01:00','2018-02-01 " +
+      "02:21:00') )")
+    checkAnswer(sql(
+      "select * from alter_com"),
+      Seq(Row(1, make(Array(1, 5)), null, null, null, null, null, null, null, null, null, null,
+        null), Row(2,
+        make(Array(2, 5)),
+        make(Array(2, 5)),
+        make(Array(2, 2)),
+        make(Array(2, 2, 3)),
+        make(Array(2.2, 2.3)),
+        make(Array(java.math.BigDecimal.valueOf(2.5).setScale(2),
+          java.math.BigDecimal.valueOf(6.7).setScale(2))),
+        make(Array("hello2", "world")),
+        make(Array("a2", "bcd")),
+        make(Array("abcd2", "efg")),
+        make(Array(true, false)),
+        make(Array(Date.valueOf("2017-02-01"),
+          Date.valueOf("2018-09-11"))),
+        make(Array(Timestamp.valueOf("2017-02-01 00:01:00"),
+          Timestamp.valueOf("2018-02-01 02:21:00")))
+      )))
+  }
+
+  test("test dropping of struct of all primitive types") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql("CREATE TABLE alter_com(intField INT,struct1 struct<a:short,b:int,c:long,d:double," +
+        "e:decimal(8,2),f:string,g:char(5),h:varchar(50),i:boolean,j:date,k:timestamp>) " +
+        "STORED AS carbondata")
+    sql("insert into alter_com values(1, named_struct('a',1,'b',2,'c',3,'d',1.23,'e',2.34,'f'," +
+        "'hello','g','abc','h','def','i',true,'j','2017-02-01','k','2018-02-01 02:00:00.0') ) ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(struct1) ")
+    checkSchemaSize(1)
+    checkDroppedColumnsInSchemaEvolutionEntry("alter_com", 1)
+    // check adding column with same name again
+    sql("ALTER TABLE alter_com ADD COLUMNS(struct1 struct<a:short,b:int,c:long,d:double, " +
+        "e:decimal(8,2),f:string,g:char(5),h:varchar(50),i:boolean,j:date,k:timestamp>)")
+    checkSchemaSize(2)
+    sql("insert into alter_com values(2, named_struct('a',1,'b',2,'c',3,'d',1.23,'e',2.34,'f'," +
+        "'hello','g','abc','h','def','i',true,'j','2017-02-01','k','2018-02-01 02:00:00.0') ) ")
+    checkAnswer(sql("select struct1 from alter_com"),
+      Seq(Row(Row(1, 2, 3, 1.23, java.math.BigDecimal.valueOf(2.34).setScale(2), "hello", "abc",
+        "def", true, Date.valueOf("2017-02-01"), Timestamp.valueOf("2018-02-01 02:00:00.0"))),
+        Row(null)))
+  }
+
+  test("test dropping of map of all primitive types") {
+    import scala.collection.mutable.WrappedArray.make
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql(
+      "CREATE TABLE alter_com(intField INT,arr array<int>, map1 map<int,long>, map2 map<short," +
+      "double>, map3 map<decimal(8,2),string>, map4 map<char(5),varchar(50)>,map5 map<boolean," +
+      "date>, map6 map<int, timestamp>) STORED AS carbondata")
+    sql("insert into alter_com values(1,array(1,2),map(2,3),map(3,4.5d),map((cast(\"1.2\" as " +
+        "decimal(8,2))),'hello'),map('abc','hello world'),map(true,'2017-02-01'),map(1," +
+        "'2017-02-01 00:01:00') ) ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(map1,map2,map3,map4,map5,map6) ")
+    checkSchemaSize(2)
+    checkDroppedColumnsInSchemaEvolutionEntry("alter_com", 6)
+    checkAnswer(sql("select * from alter_com"), Seq(Row(1, make(Array(1, 2)))))
+  }
+
+  test("Test alter drop for multi-level array") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql(
+      "CREATE TABLE alter_com(intField INT, arr1 array<struct<a:int, b:string>> comment 'arr1 " +
+      "comment', arr2 array<struct<a:int, b:array<long>>>, arr3 array<array<string>> comment " +
+      "'arr3 comment', arr4 array<array<array<string>>> ) STORED AS carbondata ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(arr1, arr2, arr3, arr4) ")
+    checkSchemaSize(1)
+    checkDroppedColumnsInSchemaEvolutionEntry("alter_com", 4)
+  }
+
+  test("Test alter drop for multi-level STRUCT") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql(
+      "CREATE TABLE alter_com(struc struct<a:string,b:int>, struct1 struct<a:int," +
+      "b:array<struct<id:int, name:string>>>, struct2 struct<a:struct<b:array<int>,c:int>," +
+      "d:string> ) STORED AS carbondata ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(struct1,struct2) ")
+    checkSchemaSize(1)
+    checkDroppedColumnsInSchemaEvolutionEntry("alter_com", 2)
+  }
+
+  test("Test alter drop for multi-level MAP") {
+    sql("DROP TABLE IF EXISTS alter_com")
+    sql(
+      "CREATE TABLE alter_com(arr array<int>, map1 map<int,array<int>>, map2 map<string," +
+      "array<map<int,int>>>, map3 map<int, struct<a:map<int,int>,b:string>>, map4 map<int," +
+      "map<int,map<int,int>>> ) STORED AS carbondata ")
+    sql("ALTER TABLE alter_com DROP COLUMNS(map1,map2,map3,map4) ")
+    checkSchemaSize(1)
+    checkDroppedColumnsInSchemaEvolutionEntry("alter_com", 4)
   }
 
   override def afterAll {