You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/02/17 09:48:25 UTC

[carbondata] branch master updated: [CARBONDATA-4125] SI compatability issue fix

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 791857b  [CARBONDATA-4125] SI compatability issue fix
791857b is described below

commit 791857b6dc51c10178d8c0862e21991e52fdc09d
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Feb 3 20:36:26 2021 +0530

    [CARBONDATA-4125] SI compatability issue fix
    
    Why is this PR needed?
    Currently, while upgrading table store with SI, we have to execute REFRESH tables and
    REGISTER INDEX command to refresh and register the index to main table. And also, while
    SI creation, we add a property 'indexTableExists' to main table, to identify if table
    has SI or not. If a table has SI, then we load the index information for that table
    from Hive {org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo}.
    indexTableExists will be default 'false' to all tables which does not have SI and for
    SI tables, this property will not be added.
    
    {org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo} will
    be called on any command to refresh indexInfo. indexTableExists property should be either
    true(Main table) or null (SI), in order to get index information from Hive and set it to
    carbon table.
    
    Issue 1:
    While upgarding tables with SI, after refresh main table and SI, If user does any operation
    like Select or Show cache, it is adding indexTableExists property to false. After register
    index and on doing any operation with SI(load or select),
    {org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore#refreshIndexInfo} is not
    updating index information to SI table, since indexTableExists is false. Hence, load to SI
    will fail.
    
    Issue 2:
    While upgarding tables with SI, after refresh main table and SI, If user does any operation
    like Update, alter, delete to SI table, while registering it as a index, it is not validating
    the alter operations done on that table.
    
    What changes were proposed in this PR?
    Issue 1:
    While registering SI table as a index, check if SI table has indexTableExists proeprty and
    remove it. For already registered index, allow re-register index to remove the property.
    
    Issue 2:
    Added validations for checking if SI has undergone Load/Update/delete/alter opertaion before
    registering it as a index and throw exception.
    
    This closes #4087
---
 .../load/CarbonInternalLoaderUtil.java             | 14 +++-
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  5 +-
 .../command/RegisterIndexTableCommand.scala        | 14 ++--
 .../secondaryindex/command/SICreationCommand.scala | 93 +++++++++++++++++++---
 4 files changed, 110 insertions(+), 16 deletions(-)

diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index dd8ea6e..5c233ed 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -42,6 +42,7 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil;
 
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.index.CarbonIndexUtil;
+import org.apache.spark.sql.secondaryindex.command.ErrorMessage;
 
 public class CarbonInternalLoaderUtil {
 
@@ -326,12 +327,20 @@ public class CarbonInternalLoaderUtil {
     return tableStatusUpdateStatus;
   }
 
+  public static boolean checkMainTableSegEqualToSISeg(
+      LoadMetadataDetails[] mainTableLoadMetadataDetails,
+      LoadMetadataDetails[] siTableLoadMetadataDetails) throws ErrorMessage {
+    return checkMainTableSegEqualToSISeg(mainTableLoadMetadataDetails, siTableLoadMetadataDetails,
+        false);
+  }
+
   /**
    * Method to check if main table and SI have same number of valid segments or not
    */
   public static boolean checkMainTableSegEqualToSISeg(
       LoadMetadataDetails[] mainTableLoadMetadataDetails,
-      LoadMetadataDetails[] siTableLoadMetadataDetails) {
+      LoadMetadataDetails[] siTableLoadMetadataDetails, boolean isRegisterIndex)
+      throws ErrorMessage {
     List<String> mainTableSegmentsList = getListOfValidSlices(mainTableLoadMetadataDetails);
     List<String> indexList = getListOfValidSlices(siTableLoadMetadataDetails);
     Collections.sort(mainTableSegmentsList);
@@ -341,6 +350,9 @@ public class CarbonInternalLoaderUtil {
     // than SI Segments
     if (indexList.size() < mainTableSegmentsList.size()) {
       return false;
+    } else if ((indexList.size() > mainTableSegmentsList.size()) && isRegisterIndex) {
+      throw new ErrorMessage("Cannot register index, as the number of Secondary index table "
+          + "segments are more than the main table segments. Try Drop and recreate SI.");
     }
     // There can be cases when the number of segments in the main table are less than the index
     // table. In this case mapping all the segments in main table to SI table.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 3f229b0..e24f52c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -342,7 +342,7 @@ object CarbonIndexUtil {
    */
   def addOrModifyTableProperty(carbonTable: CarbonTable,
     properties: Map[String, String],
-    needLock: Boolean = true)
+    needLock: Boolean = true, propertyToBeRemoved: String = null)
     (sparkSession: SparkSession): Unit = {
     val tableName = carbonTable.getTableName
     val dbName = carbonTable.getDatabaseName
@@ -374,6 +374,9 @@ object CarbonIndexUtil {
           tblPropertiesMap.put(property._1, property._2)
         }
       }
+      if (null != propertyToBeRemoved) {
+        tblPropertiesMap.remove(propertyToBeRemoved)
+      }
       val tableIdentifier = AlterTableUtil.updateSchemaInfo(
         carbonTable = carbonTable,
         thriftTable = thriftTable)(sparkSession)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
index f35cc85..903506a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/RegisterIndexTableCommand.scala
@@ -49,10 +49,6 @@ case class RegisterIndexTableCommand(dbName: Option[String], indexTableName: Str
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val databaseName = CarbonEnv.getDatabaseName(dbName)(sparkSession)
-    val databaseLocation = CarbonEnv.getDatabaseLocation(databaseName, sparkSession)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName,
-      indexTableName)
     setAuditTable(databaseName, indexTableName)
     setAuditInfo(Map("Parent TableName" -> parentTable))
     // 1. check if the main and index table exist
@@ -71,16 +67,24 @@ case class RegisterIndexTableCommand(dbName: Option[String], indexTableName: Str
                             s" database [$databaseName]"
       CarbonException.analysisException(message)
     }
+    val indexTable = CarbonEnv.getCarbonTable(dbName, indexTableName)(sparkSession)
+    // get table path from carbon table, instead of creating table path, since the SI table can
+    // be renamed before register index
+    val tablePath = indexTable.getTablePath
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, databaseName,
+      indexTableName)
     // 2. Read TableInfo
     val tableInfo = SchemaReader.getTableInfo(absoluteTableIdentifier)
     val columns: List[String] = getIndexColumn(tableInfo)
     val secondaryIndex = IndexModel(dbName, parentTable.toLowerCase, columns,
       indexTableName.toLowerCase)
+    val properties = tableInfo.getFactTable.getTableProperties
+    properties.put("tablePath", tablePath)
     // 3. Call the create index command with isCreateSIndex = false
     // (do not create the si table in store path)
     CarbonCreateSecondaryIndexCommand(
       indexModel = secondaryIndex,
-      tableProperties = tableInfo.getFactTable.getTableProperties.asScala,
+      tableProperties = properties.asScala,
       ifNotExists = false,
       isDeferredRefresh = false,
       isCreateSIndex = false).run(sparkSession)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index dcf34ee..d3aebd9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -89,7 +89,11 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
     val dbLocation = CarbonEnv.getDatabaseLocation(databaseName, sparkSession)
     val indexTableName = indexModel.indexName
 
-    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName
+    val tablePath: String = if (isCreateSIndex) {
+      dbLocation + CarbonCommonConstants.FILE_SEPARATOR + indexTableName
+    } else {
+      tableProperties("tablePath")
+    }
     setAuditTable(databaseName, indexTableName)
     setAuditInfo(Map(
       "Column names" -> indexModel.columnNames.toString(),
@@ -164,6 +168,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       val indexTableExistsInCarbon = indexTables.asScala.contains(indexTableName)
       val indexTableExistsInHive = sparkSession.sessionState.catalog
         .tableExists(TableIdentifier(indexTableName, indexModel.dbName))
+      val isRegisterIndex = !isCreateSIndex
       if (indexTableExistsInHive && isCreateSIndex) {
         if (!ifNotExists) {
           LOGGER.error(
@@ -224,6 +229,11 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
           "number of key columns in Source table")
       }
       if (indexModel.columnNames.exists(x => !dimNames.contains(x))) {
+        if (isRegisterIndex) {
+          throw new ErrorMessage(s"Cannot Register Secondary index table $indexTableName, " +
+                                 s"as it has column(s) which does not exists in $tableName. " +
+                                 s"Try Drop and recreate SI.")
+        }
         throw new ErrorMessage(
           s"one or more specified index cols either does not exist or not a key column or complex" +
           s" column in table $databaseName.$tableName")
@@ -239,6 +249,29 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         throw new ErrorMessage(
           s"Table [$tableName] under database [$databaseName] is already an index table")
       }
+      val absoluteTableIdentifier = AbsoluteTableIdentifier.
+        from(tablePath, databaseName, indexTableName)
+      val indexTablePath = CarbonTablePath
+        .getMetadataPath(absoluteTableIdentifier.getTablePath)
+      val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      var siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+        SegmentStatusManager.readLoadMetadata(indexTablePath)
+      if (isRegisterIndex) {
+        // check if SI segments are more than main table segments
+        CarbonInternalLoaderUtil
+          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+            siTblLoadMetadataDetails, isRegisterIndex)
+        // check if SI table has undergone any Update or delete operation, which can happen in
+        // case of compatibility scenario. IUD after Refresh SI and before register index
+        val updatedSegmentsCount = siTblLoadMetadataDetails.filter(loadMetaDetail =>
+          !loadMetaDetail.getUpdateStatusFileName.equals(""))
+        if (!updatedSegmentsCount.isEmpty) {
+          throw new ErrorMessage(s"Cannot Register Secondary index table $indexTableName" +
+                                 ", as it has undergone update or delete operation. " +
+                                 "Try Drop and recreate SI.")
+        }
+      }
 
       // creation of index on long string or binary columns are not supported
       val errorMsg = "one or more index columns specified contains long string or binary column" +
@@ -267,8 +300,6 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
           databaseName, indexTableName,
           indexProperties),
         true)
-      val absoluteTableIdentifier = AbsoluteTableIdentifier.
-        from(tablePath, databaseName, indexTableName)
       var tableInfo: TableInfo = null
       // if Register Index call then read schema file from the metastore
       if (!isCreateSIndex && indexTableExistsInHive) {
@@ -278,6 +309,33 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
           carbonTable, databaseName,
           tableName, indexTableName, absoluteTableIdentifier)
       }
+      if (isRegisterIndex && null != tableInfo.getFactTable.getSchemaEvolution &&
+          null != carbonTable.getTableInfo.getFactTable.getSchemaEvolution) {
+        // check if SI table has undergone any alter schema operation before registering it
+        val indexTableSchemaEvolutionEntryList = tableInfo
+          .getFactTable
+          .getSchemaEvolution
+          .getSchemaEvolutionEntryList
+        val mainTableSchemaEvolutionEntryList = carbonTable
+          .getTableInfo
+          .getFactTable
+          .getSchemaEvolution
+          .getSchemaEvolutionEntryList
+        if (indexTableSchemaEvolutionEntryList.size() > mainTableSchemaEvolutionEntryList.size()) {
+          val index = mainTableSchemaEvolutionEntryList.size()
+          for (i <- index until indexTableSchemaEvolutionEntryList.size()) {
+            val schemaEntry = indexTableSchemaEvolutionEntryList.get(i)
+            val isSITableRenamed =
+              (schemaEntry.getAdded == null && schemaEntry.getRemoved == null) ||
+              (schemaEntry.getAdded.isEmpty && schemaEntry.getRemoved.isEmpty)
+            if (!isSITableRenamed) {
+              throw new ErrorMessage(s"Cannot Register Secondary index table $indexTableName" +
+                                     ", as it has undergone column schema addition or deletion. " +
+                                     "Try Drop and recreate SI.")
+            }
+          }
+        }
+      }
       if (!isCreateSIndex && !indexTableExistsInHive) {
         LOGGER.error(
           s"Index registration with Database name [$databaseName] and index name " +
@@ -338,6 +396,8 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
                 'parentTableId' = '${carbonTable.getCarbonTableIdentifier.getTableId}')""")
           .collect()
 
+        checkAndRemoveIndexTableExistsProperty(sparkSession, indexTableName)
+
         // Refresh the index table
         CarbonEnv
           .getInstance(sparkSession)
@@ -374,11 +434,7 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       if (isCreateSIndex) {
         LoadDataForSecondaryIndex(indexModel).run(sparkSession)
       }
-      val indexTablePath = CarbonTablePath
-        .getMetadataPath(tableInfo.getOrCreateAbsoluteTableIdentifier.getTablePath)
-      val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-      val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      siTblLoadMetadataDetails =
         SegmentStatusManager.readLoadMetadata(indexTablePath)
       val isMainTableSegEqualToSISegs = CarbonInternalLoaderUtil
         .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
@@ -396,7 +452,14 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
         s"Index created with Database name [$databaseName] and Index name [$indexTableName]")
     } catch {
       case err@(_: ErrorMessage | _: IndexTableExistException) =>
-        sys.error(err.getMessage)
+        if (err.getMessage.contains("Index Table with selected columns already exist") &&
+            !isCreateSIndex) {
+          checkAndRemoveIndexTableExistsProperty(sparkSession, indexTableName)
+          LOGGER.warn(s"Table [$indexTableName] has been already registered as Secondary " +
+                      s"Index table with table [$databaseName.${ indexModel.tableName }].")
+        } else {
+          sys.error(err.getMessage)
+        }
       case ex@(_: IOException | _: ParseException) =>
         LOGGER.error(s"Index creation with Database name [$databaseName] " +
                      s"and Index name [$indexTableName] is failed")
@@ -415,6 +478,18 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
     Seq.empty
   }
 
+  private def checkAndRemoveIndexTableExistsProperty(sparkSession: SparkSession,
+      indexTableName: String): Unit = {
+    // modify the tableProperties of indexTable by removing "indexTableExists" property
+    val indexTable = CarbonEnv.getCarbonTable(indexModel.dbName, indexTableName)(sparkSession)
+    if (indexTable.getTableInfo.getFactTable.getTableProperties.containsKey("indextableexists")) {
+      CarbonIndexUtil
+        .addOrModifyTableProperty(
+          indexTable,
+          Map(), needLock = false, "indextableexists")(sparkSession)
+    }
+  }
+
   def prepareTableInfo(carbonTable: CarbonTable,
       databaseName: String, tableName: String, indexTableName: String,
       absoluteTableIdentifier: AbsoluteTableIdentifier): TableInfo = {