You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/04/28 04:56:46 UTC

[carbondata] branch master updated: [CARBONDATA-3783]Alter table drop column on main table is not dropping the eligible secondary index tables

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f1ff001  [CARBONDATA-3783]Alter table drop column on main table is not dropping the eligible secondary index tables
f1ff001 is described below

commit f1ff001ea82109e37a95bd63971119d59e028c1c
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Thu Apr 23 15:15:13 2020 +0530

    [CARBONDATA-3783]Alter table drop column on main table is not dropping the eligible secondary index tables
    
    Why is this PR needed?
    Alter table drop column on main table when droping columns match to a secondary index table columns is actually
    not dropping secondary index table.
    
    What changes were proposed in this PR?
    1. Have corrected the dropApplicableSITables to check if the drop columns are matching the index table columns.
    If all columns of a index table match to drop columns, then drop the index table itself.
    
    2. Index table refresh was missed in CarbonCreateSecondaryIndexCommand. It was missed in secondary index feature PR.
    
    This closes #3725
---
 .../secondaryindex/TestSIWithSecondryIndex.scala   | 27 +++++++++++++++
 .../execution/command/index/DropIndexCommand.scala |  9 +++--
 .../spark/sql/execution/strategy/DDLStrategy.scala |  3 +-
 .../secondaryindex/command/SICreationCommand.scala |  8 +++++
 .../events/AlterTableDropColumnEventListener.scala | 39 +++++++++++-----------
 5 files changed, 62 insertions(+), 24 deletions(-)

diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 8f2c8fb..9e6ce9b 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -28,6 +28,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
 import org.apache.spark.sql.test.util.QueryTest
 
 class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
@@ -35,6 +37,8 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
     sql("drop index if exists si_altercolumn on table_WithSIAndAlter")
     sql("drop table if exists table_WithSIAndAlter")
+    sql("drop table if exists table_drop_columns")
+    sql("drop table if exists table_drop_columns_fail")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
@@ -61,6 +65,27 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test ("test alter drop all columns of the SI table") {
+    sql("create table table_drop_columns (name string, id string, country string) stored as carbondata")
+    sql("insert into table_drop_columns select 'xx', '1', 'china'")
+    sql("create index index_1 on table table_drop_columns(id, country) as 'carbondata'")
+    // alter table to drop all the columns used in index
+    sql("alter table table_drop_columns drop columns(id, country)")
+    sql("insert into table_drop_columns select 'xy'")
+    assert(sql("show indexes on table_drop_columns").collect().isEmpty)
+  }
+
+  test ("test alter drop few columns of the SI table") {
+    sql("create table table_drop_columns_fail (name string, id string, country string) stored as carbondata")
+    sql("insert into table_drop_columns_fail select 'xx', '1', 'china'")
+    sql("create index index_1 on table table_drop_columns_fail(id, country) as 'carbondata'")
+    // alter table to drop few columns used in index. This should fail as we are not dropping all
+    // the index columns
+    assert(intercept[ProcessMetaDataException](sql(
+      "alter table table_drop_columns_fail drop columns(id)")).getMessage
+      .contains("Alter table drop column operation failed:"))
+  }
+
   test("Test secondry index data count") {
     checkAnswer(sql("select count(*) from si_altercolumn")
       ,Seq(Row(1)))
@@ -229,5 +254,7 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists column_meta_cache")
     sql("drop table if exists uniqdata")
     sql("drop table if exists uniqdataTable")
+    sql("drop table if exists table_drop_columns")
+    sql("drop table if exists table_drop_columns_fail")
   }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala
index ce730a9..d87cf0a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/DropIndexCommand.scala
@@ -42,7 +42,8 @@ private[sql] case class DropIndexCommand(
     ifExistsSet: Boolean,
     dbNameOp: Option[String],
     parentTableName: String = null,
-    indexName: String)
+    indexName: String,
+    needLock: Boolean = true)
   extends RunnableCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -74,7 +75,11 @@ private[sql] case class DropIndexCommand(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(dbNameOp)(sparkSession)
     var tableIdentifierForAcquiringLock: AbsoluteTableIdentifier = null
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+    val locksToBeAcquired = if (needLock) {
+      List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+    } else {
+      List.empty
+    }
     val catalog = CarbonEnv.getInstance(sparkSession).carbonMetaStore
     // flag to check if folders and files can be successfully deleted
     var isValidDeletion = false
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index a26d728..bf6fefd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand, MatchResetCommand}
 import org.apache.spark.sql.secondaryindex.command.CarbonCreateSecondaryIndexCommand
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 
 /**
@@ -230,7 +229,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           case c: Exception =>
             sys.error("Operation not allowed on non-carbon table")
         }
-      case dropIndex@DropIndexCommand(ifExistsSet, databaseNameOp, parentTableName, tableName) =>
+      case dropIndex@DropIndexCommand(ifExistsSet, databaseNameOp, parentTableName, tableName, _) =>
         val tableIdentifier = TableIdentifier(parentTableName, databaseNameOp)
         val isParentTableExists = sparkSession.sessionState.catalog.tableExists(tableIdentifier)
         if (!isParentTableExists) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index dd511ee..be0c614 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -351,6 +351,14 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
                 'false', 'parentTablePath' = '${carbonTable.getTablePath}',
                 'parentTableId' = '${carbonTable.getCarbonTableIdentifier.getTableId}')""")
           .collect()
+
+        // Refresh the index table
+        CarbonEnv
+          .getInstance(sparkSession)
+          .carbonMetaStore
+          .lookupRelation(indexModel.dbName, indexTableName)(sparkSession)
+          .asInstanceOf[CarbonRelation]
+          .carbonTable
       }
 
       CarbonIndexUtil.addIndexTableInfo(IndexType.SI.getIndexProviderName,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala
index 76d3a6e..08f421d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableDropColumnEventListener.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.execution.command.AlterTableDropColumnModel
 import org.apache.spark.sql.execution.command.index.DropIndexCommand
 import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.index.CarbonIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.events.{AlterTableDropColumnPreEvent, Event, OperationContext, OperationEventListener}
 
@@ -71,27 +71,26 @@ class AlterTableDropColumnEventListener extends OperationEventListener {
       return
     }
     secondaryIndexMap.asScala.foreach(indexTable => {
-        var colSize = 0
-        indexTable._2.asScala.foreach(column =>
-          if (alterTableDropColumnModel.columns.contains(column)) {
-            colSize += 1
-          })
-        if (colSize > 0) {
-          if (colSize == indexTable._2.size) {
-            indexTableToDrop ++= Seq(indexTable._1)
-          } else {
-            sys
-              .error(s"Index Table [${
-                indexTable
-                  ._1
-              }] exists with combination of provided drop column(s) and other columns, drop " +
-                     s"index table & retry")
-          }
+      val indexColumns = indexTable._2.asScala(CarbonCommonConstants.INDEX_COLUMNS).split(",")
+      val colSize = alterTableDropColumnModel.columns.intersect(indexColumns).size
+      if (colSize > 0) {
+        if (colSize == indexColumns.size) {
+          indexTableToDrop ++= Seq(indexTable._1)
+        } else {
+          sys
+            .error(s"Index Table [${
+              indexTable._1
+            }] exists with combination of provided drop column(s) and other columns, drop " +
+                   s"index table & retry")
         }
-      })
+      }
+    })
     indexTableToDrop.foreach { indexTable =>
-      DropIndexCommand(ifExistsSet = true, Some(dbName), indexTable.toLowerCase, tableName)
-        .run(sparkSession)
+      DropIndexCommand(ifExistsSet = true,
+        Some(dbName),
+        tableName,
+        indexTable.toLowerCase,
+        needLock = false).run(sparkSession)
     }
   }
 }