You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/05/12 19:06:50 UTC

[carbondata] branch master updated: [CARBONDATA-3814]Remove dead code and refactor MV events

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c063610  [CARBONDATA-3814]Remove dead code and refactor MV events
c063610 is described below

commit c063610c6f021ca45abf7b065ff6aa24f01bb24e
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Thu May 7 22:24:58 2020 +0530

    [CARBONDATA-3814]Remove dead code and refactor MV events
    
    Why is this PR needed?
    Some code is not used in Index Events, some MV events should be at one place
    
    What changes were proposed in this PR?
    Removed the dead code and refactored the existing MV events class
    
    This closes #3755
---
 .../org/apache/carbondata/events/Events.scala      |  2 +-
 .../org/apache/carbondata/events/IndexEvents.scala | 37 ---------------------
 .../org/apache/carbondata/view/MVEvents.scala      | 38 ++++++++++++++--------
 .../command/view/CarbonCreateMVCommand.scala       | 10 ++++--
 .../command/view/CarbonDropMVCommand.scala         | 11 +++++--
 .../command/view/CarbonRefreshMVCommand.scala      | 10 +++---
 6 files changed, 48 insertions(+), 60 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala
index a400275..a6f0457 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -195,7 +195,7 @@ trait CreateCarbonRelationEventInfo {
  */
 trait MVEventsInfo {
   val sparkSession: SparkSession
-  val storePath: String
+  val systemDirectoryPath: String
 }
 
 /**
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala
index 09b9891..8c393ed 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala
@@ -23,43 +23,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 
 /**
- * For handling operation's before start of MV creation
- */
-case class CreateMVPreExecutionEvent(
-    sparkSession: SparkSession,
-    storePath: String,
-    tableIdentifier: TableIdentifier)
-  extends Event with MVEventsInfo
-
-/**
- * For handling operation's after finish of MV creation
- */
-case class CreateMVPostExecutionEvent(
-    sparkSession: SparkSession,
-    storePath: String,
-    tableIdentifier: Option[TableIdentifier],
-    dmProviderName: String)
-  extends Event with MVEventsInfo
-
-/**
- * For handling operation's before start of update MV status
- */
-case class UpdateMVPreExecutionEvent(
-    sparkSession: SparkSession,
-    storePath: String,
-    tableIdentifier: TableIdentifier)
-  extends Event with MVEventsInfo
-
-/**
- * For handling operation's after finish of  update MV table
- */
-case class UpdateMVPostExecutionEvent(
-    sparkSession: SparkSession,
-    storePath: String,
-    tableIdentifier: TableIdentifier)
-  extends Event with MVEventsInfo
-
-/**
  * For handling operation's before start of index build over table with index
  * example: bloom index, Lucene index
  */
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala
index 5274ea7..75bd13a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala
@@ -20,31 +20,43 @@ package org.apache.carbondata.view
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 
-import org.apache.carbondata.events.Event
+import org.apache.carbondata.events.{Event, MVEventsInfo}
 
 /**
- * For handling operation's before start of mv creation
+ * For handling operation's before start of MV creation
  */
-case class CreateMVPreExecutionEvent(sparkSession: SparkSession,
-    tableIdentifier: TableIdentifier) extends Event
+case class CreateMVPreExecutionEvent(
+    sparkSession: SparkSession,
+    systemDirectoryPath: String,
+    tableIdentifier: TableIdentifier)
+  extends Event with MVEventsInfo
 
 /**
- * For handling operation's after finish of mv creation
+ * For handling operation's after finish of MV creation
  */
-case class CreateMVPostExecutionEvent(sparkSession: SparkSession,
-    tableIdentifier: TableIdentifier) extends Event
+case class CreateMVPostExecutionEvent(
+    sparkSession: SparkSession,
+    systemDirectoryPath: String,
+    tableIdentifier: TableIdentifier)
+  extends Event with MVEventsInfo
 
 /**
- * For handling operation's before start of update mv status
+ * For handling operation's before start of update MV status
  */
-case class UpdateMVPreExecutionEvent(sparkSession: SparkSession,
-    tableIdentifier: TableIdentifier) extends Event
+case class UpdateMVPreExecutionEvent(
+    sparkSession: SparkSession,
+    systemDirectoryPath: String,
+    tableIdentifier: TableIdentifier)
+  extends Event with MVEventsInfo
 
 /**
- * For handling operation's after finish of update mv status
+ * For handling operation's after finish of  update MV table
  */
-case class UpdateMVPostExecutionEvent(sparkSession: SparkSession,
-    tableIdentifier: TableIdentifier) extends Event
+case class UpdateMVPostExecutionEvent(
+    sparkSession: SparkSession,
+    systemDirectoryPath: String,
+    tableIdentifier: TableIdentifier)
+  extends Event with MVEventsInfo
 
 /**
  * For handling operation's before start of mv refresh
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 96ac24f..22c4b52 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.view._
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, SimpleModularizer}
@@ -84,9 +85,14 @@ case class CarbonCreateMVCommand(
     }
 
     val identifier = TableIdentifier(name, Option(databaseName))
+    val databaseLocation = viewManager.getDatabaseLocation(databaseName)
+    val systemDirectoryPath = CarbonProperties.getInstance()
+      .getSystemFolderLocationPerDatabase(FileFactory
+        .getCarbonFile(databaseLocation)
+        .getCanonicalPath)
     val operationContext: OperationContext = new OperationContext()
     OperationListenerBus.getInstance().fireEvent(
-      CreateMVPreExecutionEvent(session, identifier),
+      CreateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
       operationContext)
 
     val catalogFactory = new MVCatalogFactory[MVSchemaWrapper] {
@@ -121,7 +127,7 @@ case class CarbonCreateMVCommand(
     }
 
     OperationListenerBus.getInstance().fireEvent(
-      CreateMVPostExecutionEvent(session, identifier),
+      CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
       operationContext)
 
     this.viewSchema = schema
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
index d521f9f..5aa5a04 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.view.{MVCatalogInSpark, MVManagerInSpark, UpdateMVPostExecutionEvent, UpdateMVPreExecutionEvent}
 
@@ -52,14 +54,19 @@ case class CarbonDropMVCommand(
       val schema = viewManager.getSchema(databaseName, name)
       if (schema != null) {
         // Drop mv status.
+        val databaseLocation = viewManager.getDatabaseLocation(databaseName)
+        val systemDirectoryPath = CarbonProperties.getInstance()
+          .getSystemFolderLocationPerDatabase(FileFactory
+            .getCarbonFile(databaseLocation)
+            .getCanonicalPath)
         val identifier = TableIdentifier(name, Option(databaseName))
         val operationContext = new OperationContext()
         OperationListenerBus.getInstance().fireEvent(
-          UpdateMVPreExecutionEvent(session, identifier),
+          UpdateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
           operationContext)
         viewManager.onDrop(databaseName, name)
         OperationListenerBus.getInstance().fireEvent(
-          UpdateMVPostExecutionEvent(session, identifier),
+          UpdateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
           operationContext)
 
         // Drop mv table.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index 233f8c5..bec2925 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -33,26 +33,26 @@ import org.apache.carbondata.view.{MVManagerInSpark, MVRefresher, RefreshMVPostE
  */
 case class CarbonRefreshMVCommand(
     databaseNameOption: Option[String],
-    name: String) extends DataCommand {
+    mvName: String) extends DataCommand {
 
   override def processData(session: SparkSession): Seq[Row] = {
     val databaseName =
       databaseNameOption.getOrElse(session.sessionState.catalog.getCurrentDatabase)
     val viewManager = MVManagerInSpark.get(session)
     val schema = try {
-      viewManager.getSchema(databaseName, name)
+      viewManager.getSchema(databaseName, mvName)
     } catch {
       case _: NoSuchMVException =>
         throw new MalformedMVCommandException(
-          s"Materialized view ${ databaseName }.${ name } does not exist")
+          s"Materialized view ${ databaseName }.${ mvName } does not exist")
     }
-    val table = CarbonEnv.getCarbonTable(Option(databaseName), name)(session)
+    val table = CarbonEnv.getCarbonTable(Option(databaseName), mvName)(session)
     setAuditTable(table)
 
     MVRefresher.refresh(schema, session)
 
     // After rebuild successfully enable the MV table.
-    val identifier = TableIdentifier(name, Option(databaseName))
+    val identifier = TableIdentifier(mvName, Option(databaseName))
     val operationContext = new OperationContext()
     OperationListenerBus.getInstance().fireEvent(
       RefreshMVPreExecutionEvent(session, identifier),