You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/09/16 03:19:40 UTC

[carbondata] branch master updated: [CARBONDATA-4277] geo instance compatability fix

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7199357  [CARBONDATA-4277] geo instance compatability fix
7199357 is described below

commit 719935795f2dbc165cca304a47a0b456aac54e8c
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Tue Sep 14 13:37:48 2021 +0530

    [CARBONDATA-4277] geo instance compatability fix
    
    Why is this PR needed?
    The CustomIndex interface extends Serializable and for different
    version store, if the serialization id doesn't match, it throws
    java.io.InvalidClassException during load/update/query operations.
    
    What changes were proposed in this PR?
    As the instance is stored in table properties, made changes to
    initialize and update instance while refresh table. Also added
    static serialId for the CustomIndex interface.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No, tested in cluster
    
    This closes #4216
---
 .../apache/carbondata/core/util/CustomIndex.java   |  2 ++
 docs/spatial-index-guide.md                        |  2 ++
 .../spark/sql/catalyst/CarbonParserUtil.scala      | 30 +++++++++++++---------
 .../management/RefreshCarbonTableCommand.scala     | 24 ++++++++++++++++-
 .../org/apache/spark/util/AlterTableUtil.scala     |  4 ++-
 5 files changed, 48 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
index 0effbc6..1d5c390 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CustomIndex.java
@@ -35,6 +35,8 @@ import java.util.Map;
  * @param <ReturnType>
  */
 public abstract class CustomIndex<ReturnType> implements Serializable {
+
+  private static final long serialVersionUID = 6529685098267757692L;
   /**
    * Initialize the custom index instance.
    * @param indexName
diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index 8484d41..08d487d 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -78,6 +78,8 @@ Note:
    * `mygeohash` in the above example represent the index name.
    * Columns present in spatial_index table properties cannot be altered
     i.e., sourcecolumns: `longitude, latitude` and index column: `mygeohash` in the above example.
+   * To make the spatial instance compatible with previous versions, trigger refresh table command.
+     In direct upgrade scenario, if spatial table already exists then refresh command fails but updates the instance property in metadata.
 
 #### List of spatial index table properties
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 7a5f53f..3c4ed7b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -91,6 +91,23 @@ object CarbonParserUtil {
     }
   }
 
+  def initializeSpatialIndexInstance(spatialIndexClassName: String, indexName: String,
+      tableProperties: mutable.Map[String, String]): Unit = {
+    val SPATIAL_INDEX_INSTANCE = s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.instance"
+    try {
+      val spatialIndexClass : Class[_] = java.lang.Class.forName(spatialIndexClassName)
+      val instance = spatialIndexClass.newInstance().asInstanceOf[CustomIndex[_]]
+      instance.init(indexName, tableProperties.asJava)
+      tableProperties.put(SPATIAL_INDEX_INSTANCE, CustomIndex.getCustomInstance(instance))
+    } catch {
+      case ex@(_: ClassNotFoundException | _: InstantiationError | _: IllegalAccessException |
+               _: ClassCastException) =>
+        val err = s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property process failed. "
+        LOGGER.error(err, ex)
+        throw new MalformedCarbonCommandException(err, ex)
+    }
+  }
+
   /**
    * The method parses, validates and processes the spatial_index property.
    * @param tableProperties Table properties
@@ -170,18 +187,7 @@ object CarbonParserUtil {
                 s"Unsupported value: ${ spatialIndexType.get } specified for property $TYPE.")
             }
         }
-        try {
-          val spatialIndexClass : Class[_] = java.lang.Class.forName(spatialIndexClassName)
-          val instance = spatialIndexClass.newInstance().asInstanceOf[CustomIndex[_]]
-          instance.init(indexName, tableProperties.asJava)
-          tableProperties.put(SPATIAL_INDEX_INSTANCE, CustomIndex.getCustomInstance(instance))
-        } catch {
-          case ex@(_: ClassNotFoundException | _: InstantiationError | _: IllegalAccessException |
-                   _: ClassCastException) =>
-            val err = s"Carbon ${ CarbonCommonConstants.SPATIAL_INDEX } property process failed. "
-            LOGGER.error(err, ex)
-            throw new MalformedCarbonCommandException(err, ex)
-        }
+        initializeSpatialIndexInstance(spatialIndexClassName, indexName, tableProperties)
         // Insert spatial column as a sort column if it is not already present in it.
         CarbonScalaUtil.insertColumnToSortColumns(indexName, sources, tableProperties)
         fields += Field(indexName, Some("BigInt"), Some(indexName), Some(null), spatialIndex = true)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index aea8369..160908d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -21,12 +21,13 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.CarbonParserUtil.initializeSpatialIndexInstance
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
-import org.apache.spark.util.SparkUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -83,6 +84,27 @@ case class RefreshCarbonTableCommand(
       if (FileFactory.isFileExist(schemaFilePath)) {
         // read TableInfo
         val tableInfo = SchemaReader.getTableInfo(identifier)
+        val tableProperties = tableInfo.getFactTable.getTableProperties
+        val spatialIndex = CarbonCommonConstants.SPATIAL_INDEX
+        val indexName = tableProperties.get(spatialIndex)
+        if (indexName != null) {
+          val SPATIAL_INDEX_CLASS = s"$spatialIndex.$indexName.class"
+          val SPATIAL_INDEX_INSTANCE = s"$spatialIndex.$indexName.instance"
+          // For spatial table, To make the instance compatible with previous versions,
+          // initialise and update the index instance in table properties.
+          tableProperties.remove(SPATIAL_INDEX_INSTANCE)
+          initializeSpatialIndexInstance(tableProperties.get(SPATIAL_INDEX_CLASS),
+            indexName, tableProperties.asScala)
+          val tableIdentifier = new TableIdentifier(tableName, Some(tableInfo.getDatabaseName))
+          if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
+            // In direct upgrade scenario, if spatial table already exists then on refresh command,
+            // update the property in metadata and fail table creation.
+            LOGGER.info(s"Updating $SPATIAL_INDEX_INSTANCE table property on $tableName")
+            AlterTableUtil.modifyTableProperties(tableIdentifier,
+              Map(SPATIAL_INDEX_INSTANCE -> tableProperties.get(SPATIAL_INDEX_INSTANCE)),
+              Seq.empty, true)(sparkSession, sparkSession.sessionState.catalog)
+          }
+        }
         // remove mv related info from source table properties
         tableInfo.getFactTable
           .getTableProperties.remove(CarbonCommonConstants.RELATED_MV_TABLES_MAP)
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 2168ba9..cfa6e31 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -485,7 +485,9 @@ object AlterTableUtil {
         // with the newly added/modified comment since thriftTable also holds comment as its
         // direct property.
         lowerCasePropertiesMap.foreach { property =>
-          if (validateTableProperties(property._1)) {
+          if (validateTableProperties(property._1) ||
+              (property._1.startsWith(CarbonCommonConstants.SPATIAL_INDEX) &&
+               property._1.endsWith("instance"))) {
             tblPropertiesMap.put(property._1, property._2)
           } else {
             val errorMessage = "Error: Invalid option(s): " + property._1.toString()