You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/06/16 09:23:25 UTC

[carbondata] branch master updated: [CARBONDATA-4206] Support rename SI table

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f1da9e8  [CARBONDATA-4206] Support rename SI table
f1da9e8 is described below

commit f1da9e8c155297ea22808e5b01137cd84e9561d9
Author: jack86596 <ja...@gmail.com>
AuthorDate: Thu Jun 10 19:35:39 2021 +0800

    [CARBONDATA-4206] Support rename SI table
    
    Why is this PR needed?
    Currently rename SI table can succeed, but after rename, insert and query on main table
    failed, throw no such table exception. This is because after SI table renamed, main
    table's tblproperties didn't get update, it still stores the old SI table name, when
    refering to SI table, it tries to find the SI table by old name, which leads to no such table exception.
    
    What changes were proposed in this PR?
    After SI table renamed, update the main table's tblproperties with new SI information.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4149
---
 .../metadata/schema/indextable/IndexMetadata.java  | 15 +++++
 .../schema/CarbonAlterTableRenameCommand.scala     | 74 +++++++++++++++-------
 .../apache/spark/sql/index/CarbonIndexUtil.scala   | 24 +++++--
 .../events/AlterTableRenameEventListener.scala     | 11 ++--
 .../createTable/TestRenameTableWithIndex.scala     | 45 +++++++++++++
 5 files changed, 135 insertions(+), 34 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexMetadata.java
index 066cd8d..fa7fc73 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/indextable/IndexMetadata.java
@@ -97,6 +97,17 @@ public class IndexMetadata implements Serializable {
     }
   }
 
+  public void renameIndexWithStatus(String indexProvider, String oldIndexName,
+      String newIndexName, String indexStatus) {
+    if (null != indexProviderMap) {
+      Map<String, String> properties = indexProviderMap.get(indexProvider).remove(oldIndexName);
+      if (properties != null) {
+        properties.put(CarbonCommonConstants.INDEX_STATUS, indexStatus);
+        indexProviderMap.get(indexProvider).put(newIndexName, properties);
+      }
+    }
+  }
+
   public void updateIndexStatus(String indexProvider, String indexName, String indexStatus) {
     if (null != indexProviderMap) {
       indexProviderMap.get(indexProvider).get(indexName)
@@ -162,4 +173,8 @@ public class IndexMetadata implements Serializable {
   public String getIndexColumns(String provider, String indexName) {
     return indexProviderMap.get(provider).get(indexName).get(CarbonCommonConstants.INDEX_COLUMNS);
   }
+
+  public String getIndexStatus(String provider, String indexName) {
+    return indexProviderMap.get(provider).get(indexName).get(CarbonCommonConstants.INDEX_STATUS);
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 05052aa..ad4f831 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil, MockClassForAlterRevertTests}
+import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -29,7 +30,9 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.index.IndexStoreManager
+import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
@@ -69,31 +72,26 @@ private[sql] case class CarbonAlterTableRenameCommand(
       throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
     }
 
-    var oldCarbonTable: CarbonTable = null
-    oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-      .asInstanceOf[CarbonRelation].carbonTable
-    if (!oldCarbonTable.getTableInfo.isTransactionalTable) {
+    var carbonTable: CarbonTable = relation.carbonTable
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
-    if (!oldCarbonTable.canAllow(oldCarbonTable, TableOperation.ALTER_RENAME)) {
+    if (!carbonTable.canAllow(carbonTable, TableOperation.ALTER_RENAME)) {
       throw new MalformedCarbonCommandException("alter rename is not supported for this table")
     }
     // if table have created MV, not support table rename
-    if (MVManagerInSpark.get(sparkSession).hasSchemaOnTable(oldCarbonTable) ||
-        oldCarbonTable.isMV) {
+    if (MVManagerInSpark.get(sparkSession).hasSchemaOnTable(carbonTable) || carbonTable.isMV) {
       throw new MalformedCarbonCommandException(
         "alter rename is not supported for MV table or for tables which have child MV")
     }
 
     var timeStamp = 0L
-    var carbonTable: CarbonTable = null
     var hiveRenameSuccess = false
     // lock file path to release locks after operation
     var carbonTableLockFilePath: String = null
+    var originalIndexStatusBeforeDisable: IndexStatus = null
     try {
-      carbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
-        .asInstanceOf[CarbonRelation].carbonTable
       carbonTableLockFilePath = carbonTable.getTablePath
       // if any load is in progress for table, do not allow rename table
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
@@ -104,13 +102,19 @@ private[sql] case class CarbonAlterTableRenameCommand(
       IndexStoreManager.getInstance().clearIndex(oldAbsoluteTableIdentifier)
       // get the latest carbon table and check for column existence
       val operationContext = new OperationContext
-      // TODO: Pass new Table Path in pre-event.
-      val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
-        carbonTable,
-        alterTableRenameModel,
-        "",
-        sparkSession)
-      OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
+      if (carbonTable.isIndexTable) {
+        val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
+        val parentTableName = carbonTable.getParentTableName
+        val parentTable: CarbonTable = CarbonEnv.getCarbonTable(
+          Some(oldDatabaseName), parentTableName)(sparkSession)
+        originalIndexStatusBeforeDisable = CarbonIndexUtil.updateIndexInfo(
+          parentTable, oldIndexName, IndexType.SI, IndexStatus.DISABLED)(sparkSession)
+      } else {
+        // TODO: Pass new Table Path in pre-event.
+        val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
+          carbonTable, alterTableRenameModel, "", sparkSession)
+        OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
+      }
       val tableInfo: org.apache.carbondata.format.TableInfo =
         metastore.getThriftTableInfo(carbonTable)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
@@ -142,12 +146,24 @@ private[sql] case class CarbonAlterTableRenameCommand(
         carbonTable.getTablePath)(sparkSession)
       new MockClassForAlterRevertTests().mockForAlterRevertTest()
 
-      val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
-        carbonTable,
-        alterTableRenameModel,
-        oldAbsoluteTableIdentifier.getTablePath,
-        sparkSession)
-      OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
+      if (carbonTable.isIndexTable) {
+        val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
+        val parentTableName = carbonTable.getParentTableName
+        val parentTable: CarbonTable = metastore
+          .lookupRelation(Some(oldDatabaseName), parentTableName)(sparkSession)
+          .asInstanceOf[CarbonRelation].carbonTable
+        CarbonIndexUtil.updateIndexInfo(parentTable, oldIndexName, IndexType.SI,
+          originalIndexStatusBeforeDisable, newTableName)(sparkSession)
+        metastore.lookupRelation(Some(oldDatabaseName), newTableName)(sparkSession)
+          .asInstanceOf[CarbonRelation]
+      } else {
+        val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
+          carbonTable,
+          alterTableRenameModel,
+          oldAbsoluteTableIdentifier.getTablePath,
+          sparkSession)
+        OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
+      }
 
       sparkSession.catalog.refreshTable(newTableIdentifier.quotedString)
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
@@ -163,6 +179,18 @@ private[sql] case class CarbonAlterTableRenameCommand(
             sparkSession,
             carbonTable.isExternalTable)
         }
+        // it means rename table is index table and disable index has succeed, need revert
+        if (originalIndexStatusBeforeDisable != null &&
+            !originalIndexStatusBeforeDisable.equals(IndexStatus.DISABLED)) {
+          val oldIndexName = alterTableRenameModel.oldTableIdentifier.table
+          val parentTableName = carbonTable.getParentTableName
+          val parentTable: CarbonTable = CarbonEnv.getCarbonTable(
+            Some(oldDatabaseName), parentTableName)(sparkSession)
+          CarbonIndexUtil.updateIndexInfo(parentTable, oldIndexName, IndexType.SI,
+            originalIndexStatusBeforeDisable)(sparkSession)
+          metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
+            .asInstanceOf[CarbonRelation]
+        }
         if (carbonTable != null) {
           AlterTableUtil.revertRenameTableChanges(
             newTableName,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index a78a6fa..2fcc268 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -435,12 +435,17 @@ object CarbonIndexUtil {
     }
   }
 
-  def updateIndexStatus(carbonTable: CarbonTable,
+  /**
+   *
+   * @param newIndexName nonEmpty means rename index
+   * @return original index status, null means update failed
+   */
+  def updateIndexInfo(carbonTable: CarbonTable,
       indexName: String,
       indexType: IndexType,
       status: IndexStatus,
-      needLock: Boolean = true,
-      sparkSession: SparkSession): Unit = {
+      newIndexName: String = "",
+      needLock: Boolean = true)(sparkSession: SparkSession): IndexStatus = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     val locks: java.util.List[ICarbonLock] = new java.util.ArrayList[ICarbonLock]
@@ -458,8 +463,15 @@ object CarbonIndexUtil {
       }
       CarbonMetadata.getInstance.removeTable(dbName, tableName)
       val table = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      val indexInfo = IndexTableInfo.setIndexStatus(table.getIndexInfo, indexName, status)
       val indexMetadata = table.getIndexMetadata
+      val originalStatus = indexMetadata.getIndexStatus(indexType.getIndexProviderName, indexName)
+      if (newIndexName.isEmpty) {
+        indexMetadata.updateIndexStatus(indexType.getIndexProviderName, indexName, status.name())
+      } else {
+        indexMetadata.renameIndexWithStatus(indexType.getIndexProviderName,
+          indexName, newIndexName, status.name())
+      }
+      val newIndexInfo = table.getIndexInfo
       indexMetadata.updateIndexStatus(indexType.getIndexProviderName, indexName, status.name())
       table.getTableInfo
         .getFactTable
@@ -467,14 +479,16 @@ object CarbonIndexUtil {
         .put(table.getCarbonTableIdentifier.getTableId, indexMetadata.serialize)
       sparkSession.sql(
         s"""ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES ('indexInfo' =
-           |'$indexInfo')"""
+           |'$newIndexInfo')"""
           .stripMargin).collect()
       CarbonHiveIndexMetadataUtil.refreshTable(dbName, tableName, sparkSession)
       CarbonMetadata.getInstance.removeTable(dbName, tableName)
       CarbonMetadata.getInstance.loadTableMetadata(table.getTableInfo)
+      IndexStatus.valueOf(originalStatus)
     } catch {
       case e: Exception =>
         LOGGER.error("Failed to update index status for %s".format(indexName))
+        null
     } finally {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableRenameEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableRenameEventListener.scala
index 2a4f40b..558a136 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableRenameEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableRenameEventListener.scala
@@ -23,7 +23,6 @@ import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.index.CarbonIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.index.IndexType
@@ -40,11 +39,11 @@ class AlterTableRenameEventListener extends OperationEventListener with Logging
    */
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     event match {
-      case alterTableRenamePreEvent: AlterTableRenamePostEvent =>
-        LOGGER.info("alter table rename Pre event listener called")
-        val alterTableRenameModel = alterTableRenamePreEvent.alterTableRenameModel
-        val carbonTable = alterTableRenamePreEvent.carbonTable
-        val sparkSession = alterTableRenamePreEvent.sparkSession
+      case alterTableRenamePostEvent: AlterTableRenamePostEvent =>
+        LOGGER.info("alter table rename post event listener called")
+        val alterTableRenameModel = alterTableRenamePostEvent.alterTableRenameModel
+        val carbonTable = alterTableRenamePostEvent.carbonTable
+        val sparkSession = alterTableRenamePostEvent.sparkSession
         val oldDatabaseName = carbonTable.getDatabaseName
         val newTableName = alterTableRenameModel.newTableIdentifier.table
         val metastore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
index 947a33a..b416353 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithIndex.scala
@@ -17,11 +17,15 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hive.MockClassForAlterRevertTests
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 /**
  * test functionality for alter table with indexSchema
@@ -34,6 +38,7 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon_table")
     sql("DROP TABLE IF EXISTS carbon_tb")
     sql("DROP TABLE IF EXISTS fact_table1")
+    sql("DROP TABLE IF EXISTS x1")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
   }
@@ -116,6 +121,45 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
       true, "dm_carbon_si")
   }
 
+  test("rename index table success, insert new record success" +
+       " and query hit new index table") {
+    sql("create table if not exists x1 (imei string, mac string) stored as carbondata")
+    sql("create index idx_x1_mac on table x1(mac) as 'carbondata'")
+    sql("alter table idx_x1_mac rename to idx_x1_mac1")
+    checkAnswer(sql("show indexes on x1"),
+      Row("idx_x1_mac1", "carbondata", "mac", "NA", "enabled", "NA"))
+    checkAnswer(sql("insert into x1 select '1', '2'"), Row("0"))
+    assert(sql("explain select * from x1 where mac = '2'")
+      .collect()(1).getString(0).contains("idx_x1_mac1"))
+    checkAnswer(sql("select count(*) from x1 where mac = '2'"), Row(1))
+    sql("DROP TABLE IF EXISTS x1")
+  }
+
+  test("rename index table fail, revert success, insert new record success" +
+       " and query hit old index table") {
+    val mock: MockUp[MockClassForAlterRevertTests] = new MockUp[MockClassForAlterRevertTests]() {
+      @Mock
+      @throws[ProcessMetaDataException]
+      def mockForAlterRevertTest(): Unit = {
+        throw new ProcessMetaDataException("default", "idx_x1_mac", "thrown in mock")
+      }
+    }
+    sql("create table if not exists x1 (imei string, mac string) stored as carbondata")
+    sql("create index idx_x1_mac on table x1(mac) as 'carbondata'")
+    intercept[ProcessMetaDataException] {
+      sql("alter table idx_x1_mac rename to idx_x1_mac1")
+    }
+    checkAnswer(sql("show indexes on x1"),
+      Row("idx_x1_mac", "carbondata", "mac", "NA", "enabled", "NA"))
+    checkAnswer(sql("insert into x1 select '1', '2'"), Row("0"))
+    val plan = sql("explain select * from x1 where mac = '2'").collect()(1).getString(0)
+    assert(plan.contains("idx_x1_mac"))
+    assert(!plan.contains("idx_x1_mac1"))
+    checkAnswer(sql("select count(*) from x1 where mac = '2'"), Row(1))
+    sql("DROP TABLE IF EXISTS x1")
+    mock.tearDown();
+  }
+
   /*
    * mv indexSchema does not support running here, now must run in mv project.
   test("Creating a mv indexSchema,then table rename") {
@@ -155,6 +199,7 @@ class TestRenameTableWithIndex extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon_table")
     sql("DROP TABLE IF EXISTS carbon_tb")
     sql("DROP TABLE IF EXISTS fact_table1")
+    sql("DROP TABLE IF EXISTS x1")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
         CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)