You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2022/03/04 08:13:19 UTC

[carbondata] branch master updated: [CARBONDATA-4326] MV not hitting with multiple sessions issue fix

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 19343a7  [CARBONDATA-4326] MV not hitting with multiple sessions issue fix
19343a7 is described below

commit 19343a7f8ce43929cdefc96a1d8a9344de280295
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Wed Mar 2 20:58:14 2022 +0530

    [CARBONDATA-4326] MV not hitting with multiple sessions issue fix
    
    Why is this PR needed?
    MV created in beeline not hitting in sql/shell and vice versa if both
    beeline and sql/shell are running in parallel. Currently, If the view
    catalog for a particular session is already initialized then the schemas
    are not reloaded each time. So when mv is created in another session
    and queried from the currently open session, mv is not hit.
    
    What changes were proposed in this PR?
    1.Reload mv catalog every time to getSchemas from the path. Register the
    schema if not present in the catalog and deregister the schema if it's dropped.
    2. When create SI is triggered, no need to try rewriting the plan and
    check for mv schemas. So, returning plan if DeserializeToObject is present.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No, tested in cluster
    
    This closes #4251
---
 .../org/apache/carbondata/core/view/MVManager.java | 58 ++++++++++++++--------
 .../apache/carbondata/view/MVManagerInSpark.scala  | 20 +++++---
 .../command/view/CarbonCreateMVCommand.scala       |  2 +-
 .../command/view/CarbonRefreshMVCommand.scala      |  2 +-
 .../apache/spark/sql/optimizer/MVRewriteRule.scala |  3 +-
 5 files changed, 53 insertions(+), 32 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
index 0618415..7253158 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVManager.java
@@ -181,32 +181,48 @@ public abstract class MVManager {
    */
   public MVCatalog<?> getCatalog(
       MVCatalogFactory<?> catalogFactory,
-      boolean reload) throws IOException {
+      List<MVSchema> currSchemas) throws IOException {
     MVCatalog<?> catalog = this.catalog;
-    if (reload || catalog == null) {
-      synchronized (lock) {
-        catalog = this.catalog;
-        if (reload || catalog == null) {
-          catalog = catalogFactory.newCatalog();
-          List<MVSchema> schemas = getSchemas();
-          if (null == catalog) {
-            throw new RuntimeException("Internal Error.");
+    synchronized (lock) {
+      catalog = this.catalog;
+      if (catalog == null) {
+        catalog = catalogFactory.newCatalog();
+      }
+      List<MVSchema> schemas = getSchemas();
+      if (schemas.size() == currSchemas.size() && currSchemas.containsAll(schemas)) {
+        return catalog;
+      }
+      for (MVSchema schema : schemas) {
+        try {
+          // register the schemas that are not already present in catalog.
+          if (!currSchemas.contains(schema)) {
+            catalog.registerSchema(schema);
           }
-          for (MVSchema schema : schemas) {
-            try {
-              catalog.registerSchema(schema);
-            } catch (Exception e) {
-              // Ignore the schema
-              LOGGER.error("Error while registering schema for mv: " + schema.getIdentifier()
-                  .getTableName());
-              if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(e.getMessage());
-              }
-            }
+        } catch (Exception e) {
+          // Ignore the schema
+          LOGGER.error(
+              "Error while registering schema for mv: " + schema.getIdentifier().getTableName());
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(e.getMessage());
+          }
+        }
+      }
+      for (MVSchema currSchema : currSchemas) {
+        try {
+          // deregister the schemas from catalog if not present in the path.
+          if (!schemas.contains(currSchema)) {
+            catalog.deregisterSchema(currSchema.getIdentifier());
+          }
+        } catch (Exception e) {
+          // Ignore the schema
+          LOGGER.error("Error while deregistering schema for mv: " + currSchema.getIdentifier()
+              .getTableName());
+          if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(e.getMessage());
           }
-          this.catalog = catalog;
         }
       }
+      this.catalog = catalog;
     }
     return catalog;
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
index 3f76247..1850b9a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVManagerInSpark.scala
@@ -83,21 +83,25 @@ object MVManagerInSpark {
   }
 
   /**
-   * when first time MVCatalogs are initialized, it stores session info also,
-   * but when carbon session is newly created, catalog map will not be cleared,
-   * so if session info is different, remove the entry from map.
+   * when first time MVCatalogs are initialized, it stores with session info.
+   * but when mv schemas are updated in other sessions or carbon session is newly created,
+   * need to update the schemas of existing catalog or create new catalog and load entries.
    */
-  def getOrReloadMVCatalog(sparkSession: SparkSession): MVCatalogInSpark = {
+  def getMVCatalog(sparkSession: SparkSession): MVCatalogInSpark = {
     val catalogFactory = new MVCatalogFactory[MVSchemaWrapper] {
       override def newCatalog(): MVCatalog[MVSchemaWrapper] = {
         new MVCatalogInSpark(sparkSession)
       }
     }
     val viewManager = MVManagerInSpark.get(sparkSession)
-    var viewCatalog = viewManager.getCatalog(catalogFactory, false).asInstanceOf[MVCatalogInSpark]
-    if (!viewCatalog.session.equals(sparkSession)) {
-      viewCatalog = viewManager.getCatalog(catalogFactory, true).asInstanceOf[MVCatalogInSpark]
+    var currSchemas: util.List[MVSchema] = new util.ArrayList[MVSchema]()
+    var viewCatalog = viewManager.getCatalog
+    if (viewCatalog != null) {
+      currSchemas = viewCatalog.asInstanceOf[MVCatalogInSpark]
+        .getAllSchemas.toStream.map(_.viewSchema).toList.asJava
     }
-    viewCatalog
+    // update the schemas in catalog
+    viewCatalog = viewManager.getCatalog(catalogFactory, currSchemas)
+    viewCatalog.asInstanceOf[MVCatalogInSpark]
   }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 5d89a12..8234c27 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -93,7 +93,7 @@ case class CarbonCreateMVCommand(
     withEvents(CreateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
       CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier)) {
       // get mv catalog
-      val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+      val viewCatalog = MVManagerInSpark.getMVCatalog(session)
       val schema = doCreate(session, identifier, viewManager, viewCatalog)
 
       // Update the related mv tables property to mv fact tables
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index 937dd6b..6a6e6d5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -48,7 +48,7 @@ case class CarbonRefreshMVCommand(
         throw new MalformedMVCommandException(
           s"Materialized view $databaseName.$mvName does not exist")
       }
-      val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+      val viewCatalog = MVManagerInSpark.getMVCatalog(session)
       if (!viewCatalog.getAllSchemas.exists(_.viewSchema.getIdentifier.getTableName
           .equals(schema.getIdentifier.getTableName))) {
         try {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
index 744d0ff..b5e983f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewriteRule.scala
@@ -50,6 +50,7 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
     logicalPlan match {
       case _: Command => return logicalPlan
       case _: LocalRelation => return logicalPlan
+      case _: DeserializeToObject => return logicalPlan
       case _ =>
     }
     try {
@@ -105,7 +106,7 @@ class MVRewriteRule(session: SparkSession) extends Rule[LogicalPlan] {
     if (!canApply) {
       return logicalPlan
     }
-    val viewCatalog = MVManagerInSpark.getOrReloadMVCatalog(session)
+    val viewCatalog = MVManagerInSpark.getMVCatalog(session)
     if (viewCatalog != null && hasSuitableMV(logicalPlan, viewCatalog)) {
       LOGGER.debug(s"Query Rewrite has been initiated for the plan: " +
                    s"${ logicalPlan.toString().trim }")