You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/05/12 19:06:50 UTC
[carbondata] branch master updated: [CARBONDATA-3814]Remove dead
code and refactor MV events
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c063610 [CARBONDATA-3814]Remove dead code and refactor MV events
c063610 is described below
commit c063610c6f021ca45abf7b065ff6aa24f01bb24e
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Thu May 7 22:24:58 2020 +0530
[CARBONDATA-3814]Remove dead code and refactor MV events
Why is this PR needed?
Some code is not used in Index Events, some MV events should be at one place
What changes were proposed in this PR?
Removed the dead code and refactored the existing MV events class
This closes #3755
---
.../org/apache/carbondata/events/Events.scala | 2 +-
.../org/apache/carbondata/events/IndexEvents.scala | 37 ---------------------
.../org/apache/carbondata/view/MVEvents.scala | 38 ++++++++++++++--------
.../command/view/CarbonCreateMVCommand.scala | 10 ++++--
.../command/view/CarbonDropMVCommand.scala | 11 +++++--
.../command/view/CarbonRefreshMVCommand.scala | 10 +++---
6 files changed, 48 insertions(+), 60 deletions(-)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala
index a400275..a6f0457 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -195,7 +195,7 @@ trait CreateCarbonRelationEventInfo {
*/
trait MVEventsInfo {
val sparkSession: SparkSession
- val storePath: String
+ val systemDirectoryPath: String
}
/**
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala
index 09b9891..8c393ed 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/events/IndexEvents.scala
@@ -23,43 +23,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
/**
- * For handling operation's before start of MV creation
- */
-case class CreateMVPreExecutionEvent(
- sparkSession: SparkSession,
- storePath: String,
- tableIdentifier: TableIdentifier)
- extends Event with MVEventsInfo
-
-/**
- * For handling operation's after finish of MV creation
- */
-case class CreateMVPostExecutionEvent(
- sparkSession: SparkSession,
- storePath: String,
- tableIdentifier: Option[TableIdentifier],
- dmProviderName: String)
- extends Event with MVEventsInfo
-
-/**
- * For handling operation's before start of update MV status
- */
-case class UpdateMVPreExecutionEvent(
- sparkSession: SparkSession,
- storePath: String,
- tableIdentifier: TableIdentifier)
- extends Event with MVEventsInfo
-
-/**
- * For handling operation's after finish of update MV table
- */
-case class UpdateMVPostExecutionEvent(
- sparkSession: SparkSession,
- storePath: String,
- tableIdentifier: TableIdentifier)
- extends Event with MVEventsInfo
-
-/**
* For handling operation's before start of index build over table with index
* example: bloom index, Lucene index
*/
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala
index 5274ea7..75bd13a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVEvents.scala
@@ -20,31 +20,43 @@ package org.apache.carbondata.view
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.carbondata.events.Event
+import org.apache.carbondata.events.{Event, MVEventsInfo}
/**
- * For handling operation's before start of mv creation
+ * For handling operation's before start of MV creation
*/
-case class CreateMVPreExecutionEvent(sparkSession: SparkSession,
- tableIdentifier: TableIdentifier) extends Event
+case class CreateMVPreExecutionEvent(
+ sparkSession: SparkSession,
+ systemDirectoryPath: String,
+ tableIdentifier: TableIdentifier)
+ extends Event with MVEventsInfo
/**
- * For handling operation's after finish of mv creation
+ * For handling operation's after finish of MV creation
*/
-case class CreateMVPostExecutionEvent(sparkSession: SparkSession,
- tableIdentifier: TableIdentifier) extends Event
+case class CreateMVPostExecutionEvent(
+ sparkSession: SparkSession,
+ systemDirectoryPath: String,
+ tableIdentifier: TableIdentifier)
+ extends Event with MVEventsInfo
/**
- * For handling operation's before start of update mv status
+ * For handling operation's before start of update MV status
*/
-case class UpdateMVPreExecutionEvent(sparkSession: SparkSession,
- tableIdentifier: TableIdentifier) extends Event
+case class UpdateMVPreExecutionEvent(
+ sparkSession: SparkSession,
+ systemDirectoryPath: String,
+ tableIdentifier: TableIdentifier)
+ extends Event with MVEventsInfo
/**
- * For handling operation's after finish of update mv status
+ * For handling operation's after finish of update MV table
*/
-case class UpdateMVPostExecutionEvent(sparkSession: SparkSession,
- tableIdentifier: TableIdentifier) extends Event
+case class UpdateMVPostExecutionEvent(
+ sparkSession: SparkSession,
+ systemDirectoryPath: String,
+ tableIdentifier: TableIdentifier)
+ extends Event with MVEventsInfo
/**
* For handling operation's before start of mv refresh
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 96ac24f..22c4b52 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.view._
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, SimpleModularizer}
@@ -84,9 +85,14 @@ case class CarbonCreateMVCommand(
}
val identifier = TableIdentifier(name, Option(databaseName))
+ val databaseLocation = viewManager.getDatabaseLocation(databaseName)
+ val systemDirectoryPath = CarbonProperties.getInstance()
+ .getSystemFolderLocationPerDatabase(FileFactory
+ .getCarbonFile(databaseLocation)
+ .getCanonicalPath)
val operationContext: OperationContext = new OperationContext()
OperationListenerBus.getInstance().fireEvent(
- CreateMVPreExecutionEvent(session, identifier),
+ CreateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
operationContext)
val catalogFactory = new MVCatalogFactory[MVSchemaWrapper] {
@@ -121,7 +127,7 @@ case class CarbonCreateMVCommand(
}
OperationListenerBus.getInstance().fireEvent(
- CreateMVPostExecutionEvent(session, identifier),
+ CreateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
operationContext)
this.viewSchema = schema
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
index d521f9f..5aa5a04 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonDropMVCommand.scala
@@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.view.{MVCatalogInSpark, MVManagerInSpark, UpdateMVPostExecutionEvent, UpdateMVPreExecutionEvent}
@@ -52,14 +54,19 @@ case class CarbonDropMVCommand(
val schema = viewManager.getSchema(databaseName, name)
if (schema != null) {
// Drop mv status.
+ val databaseLocation = viewManager.getDatabaseLocation(databaseName)
+ val systemDirectoryPath = CarbonProperties.getInstance()
+ .getSystemFolderLocationPerDatabase(FileFactory
+ .getCarbonFile(databaseLocation)
+ .getCanonicalPath)
val identifier = TableIdentifier(name, Option(databaseName))
val operationContext = new OperationContext()
OperationListenerBus.getInstance().fireEvent(
- UpdateMVPreExecutionEvent(session, identifier),
+ UpdateMVPreExecutionEvent(session, systemDirectoryPath, identifier),
operationContext)
viewManager.onDrop(databaseName, name)
OperationListenerBus.getInstance().fireEvent(
- UpdateMVPostExecutionEvent(session, identifier),
+ UpdateMVPostExecutionEvent(session, systemDirectoryPath, identifier),
operationContext)
// Drop mv table.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
index 233f8c5..bec2925 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonRefreshMVCommand.scala
@@ -33,26 +33,26 @@ import org.apache.carbondata.view.{MVManagerInSpark, MVRefresher, RefreshMVPostE
*/
case class CarbonRefreshMVCommand(
databaseNameOption: Option[String],
- name: String) extends DataCommand {
+ mvName: String) extends DataCommand {
override def processData(session: SparkSession): Seq[Row] = {
val databaseName =
databaseNameOption.getOrElse(session.sessionState.catalog.getCurrentDatabase)
val viewManager = MVManagerInSpark.get(session)
val schema = try {
- viewManager.getSchema(databaseName, name)
+ viewManager.getSchema(databaseName, mvName)
} catch {
case _: NoSuchMVException =>
throw new MalformedMVCommandException(
- s"Materialized view ${ databaseName }.${ name } does not exist")
+ s"Materialized view ${ databaseName }.${ mvName } does not exist")
}
- val table = CarbonEnv.getCarbonTable(Option(databaseName), name)(session)
+ val table = CarbonEnv.getCarbonTable(Option(databaseName), mvName)(session)
setAuditTable(table)
MVRefresher.refresh(schema, session)
// After rebuild successfully enable the MV table.
- val identifier = TableIdentifier(name, Option(databaseName))
+ val identifier = TableIdentifier(mvName, Option(databaseName))
val operationContext = new OperationContext()
OperationListenerBus.getInstance().fireEvent(
RefreshMVPreExecutionEvent(session, identifier),