You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/01/27 11:04:10 UTC

[carbondata] branch master updated: [CARBONDATA-3664]Add SchemaEvolutionEntry after alter set sort columns

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ea111d6  [CARBONDATA-3664]Add SchemaEvolutionEntry after alter set sort columns
ea111d6 is described below

commit ea111d677efa422d4ad2d76e0d786f3e6fecf5cb
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Tue Jan 14 13:16:33 2020 +0530

    [CARBONDATA-3664]Add SchemaEvolutionEntry after alter set sort columns
    
    Why is this PR needed?
    Alter set sort columns is changing schema but evolution entry is not made
    
    What changes were proposed in this PR?
    Once we do set sort columns, we change the schema which changes the column order, so make an
    evolution entry for it.
    
    This closes #3579
---
 .../CarbonAlterTableColRenameDataTypeChangeCommand.scala  |  7 +++----
 .../main/scala/org/apache/spark/util/AlterTableUtil.scala | 15 ++++++++++-----
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 16025fb..a26fac4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -181,7 +181,7 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null
       var deletedColumnSchema: ColumnSchema = null
-      val schemaEvolutionEntry: SchemaEvolutionEntry = null
+      var schemaEvolutionEntry: SchemaEvolutionEntry = null
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible)
 
       columnSchemaList.foreach { columnSchema =>
@@ -208,9 +208,8 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand(
           addColumnSchema = columnSchema
           timeStamp = System.currentTimeMillis()
           // make a new schema evolution entry after column rename or datatype change
-          AlterTableUtil
-            .addNewSchemaEvolutionEntry(schemaEvolutionEntry, timeStamp, addColumnSchema,
-              deletedColumnSchema)
+          schemaEvolutionEntry = AlterTableUtil
+            .addNewSchemaEvolutionEntry(timeStamp, addColumnSchema, deletedColumnSchema)
         }
       }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index c210161..f314e74 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -109,7 +109,8 @@ object AlterTableUtil {
       thriftTable: TableInfo,
       lowerCasePropertiesMap: mutable.Map[String, String],
       schemaConverter: SchemaConverter
-  ) = {
+  ): SchemaEvolutionEntry = {
+    var schemaEvolutionEntry: SchemaEvolutionEntry = null
     val sortColumnsOption = lowerCasePropertiesMap.get(CarbonCommonConstants.SORT_COLUMNS)
     if (sortColumnsOption.isDefined) {
       val sortColumnsString = CarbonUtil.unquoteChar(sortColumnsOption.get).trim
@@ -168,8 +169,10 @@ object AlterTableUtil {
         // use new columns
         columns.clear()
         columns.addAll(newColumns)
+        schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis())
       }
     }
+    schemaEvolutionEntry
   }
 
   /**
@@ -379,17 +382,15 @@ object AlterTableUtil {
   /**
    * This method create a new SchemaEvolutionEntry and adds to SchemaEvolutionEntry List
    *
-   * @param schemaEvolutionEntry List to add new SchemaEvolutionEntry
    * @param addColumnSchema          added new column schema
    * @param deletedColumnSchema      old column schema which is deleted
    * @return
    */
   def addNewSchemaEvolutionEntry(
-      schemaEvolutionEntry: SchemaEvolutionEntry,
       timeStamp: Long,
       addColumnSchema: org.apache.carbondata.format.ColumnSchema,
       deletedColumnSchema: org.apache.carbondata.format.ColumnSchema): SchemaEvolutionEntry = {
-    var schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
+    val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp)
     schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
     schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
     schemaEvolutionEntry
@@ -458,7 +459,10 @@ object AlterTableUtil {
       validateCompactionLevelThresholdProperties(carbonTable, lowerCasePropertiesMap)
 
       // if SORT_COLUMN is changed, it will move them to the head of column list
-      updateSchemaForSortColumns(thriftTable, lowerCasePropertiesMap, schemaConverter)
+      // Make an schemaEvolution entry as we changed the schema with different column order with
+      // alter set sort columns
+      val schemaEvolutionEntry = updateSchemaForSortColumns(thriftTable,
+        lowerCasePropertiesMap, schemaConverter)
       // validate long string columns
       val longStringColumns = lowerCasePropertiesMap.get("long_string_columns");
       if (longStringColumns.isDefined) {
@@ -522,6 +526,7 @@ object AlterTableUtil {
       }
       val (tableIdentifier, schemParts) = updateSchemaInfo(
         carbonTable = carbonTable,
+        schemaEvolutionEntry,
         thriftTable = thriftTable)(sparkSession)
       CarbonSessionCatalogUtil.alterTable(tableIdentifier, schemParts, None, sparkSession)
       CarbonSessionCatalogUtil.alterTableProperties(