You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/21 13:59:50 UTC

[carbondata] branch master updated: [CARBONDATA-3907]Refactor to use CommonLoadUtils API's firePreLoadEvents and firePostLoadEvents to trigger Load pre and post events

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 797719c  [CARBONDATA-3907]Refactor to use CommonLoadUtils API's firePreLoadEvents and firePostLoadEvents to trigger Load pre and post events
797719c is described below

commit 797719ce929266aca1cc545a35e9cc42a5994358
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Thu Jul 16 16:37:06 2020 +0530

    [CARBONDATA-3907]Refactor to use CommonLoadUtils API's firePreLoadEvents and firePostLoadEvents to
    trigger Load pre and post events
    
    Why is this PR needed?
    Currently we have 2 different ways of firing LoadTablePreExecutionEvent and LoadTablePostExecutionEvent.
    We can reuse firePreLoadEvents and firePostLoadEvents methods from CommonLoadUtils to trigger
    LoadTablePreExecutionEvent and LoadTablePostExecutionEvent respectively in alter table add segment
    flow as well.
    
    What changes were proposed in this PR?
    Reuse firePreLoadEvents and firePostLoadEvents methods from CommonLoadUtils to trigger
    LoadTablePreExecutionEvent and LoadTablePostExecutionEvent respectively in alter table add segment flow.
    
    This closes #3850
---
 .../carbondata/streaming/StreamSinkFactory.scala   | 25 ++++++------
 .../command/management/CarbonAddLoadCommand.scala  | 45 ++++++++--------------
 .../streaming/CarbonAppendableStreamSink.scala     | 25 ++++++------
 .../processing/loading/events/LoadEvents.java      |  6 ---
 4 files changed, 42 insertions(+), 59 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 0502d57..e76fad4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonBadRecordUtil
 import org.apache.carbondata.streaming.segment.StreamSegment
@@ -94,16 +94,16 @@ object StreamSinkFactory {
     // fire pre event before streamin is started
     // in case of streaming options and optionsFinal can be same
     val operationContext = new OperationContext
-    val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
-      carbonTable.getCarbonTableIdentifier,
+    val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
       carbonLoadModel,
+      "",
       carbonLoadModel.getFactFilePath,
-      false,
       parameters.asJava,
       parameters.asJava,
-      false
-    )
-    OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
+      false,
+      false,
+      None,
+      operationContext)
     // prepare the stream segment
     val segmentId = getStreamSegmentId(carbonTable)
     carbonLoadModel.setSegmentId(segmentId)
@@ -118,11 +118,12 @@ object StreamSinkFactory {
       operationContext)
 
     // fire post event before streamin is started
-    val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
-      carbonTable.getCarbonTableIdentifier,
-      carbonLoadModel
-    )
-    OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
+    CommonLoadUtils.firePostLoadEvents(sparkSession,
+      carbonLoadModel,
+      tableIndexes,
+      indexOperationContext,
+      carbonTable,
+      operationContext)
     carbonAppendableStreamSink
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index facbb58..a98cec5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -228,24 +228,16 @@ case class CarbonAddLoadCommand(
     model.setTableName(carbonTable.getTableName)
     val operationContext = new OperationContext
     operationContext.setProperty("isLoadOrCompaction", false)
-    val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
-      new LoadTablePreExecutionEvent(
-        carbonTable.getCarbonTableIdentifier,
-        model)
-    operationContext.setProperty("isOverwrite", false)
-    OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
-    // Add pre event listener for index indexSchema
-    val tableIndexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)
-    val indexOperationContext = new OperationContext()
-    if (tableIndexes.size() > 0) {
-      val indexNames: mutable.Buffer[String] =
-        tableIndexes.asScala.map(index => index.getIndexSchema.getIndexName)
-      val buildIndexPreExecutionEvent: BuildIndexPreExecutionEvent =
-        BuildIndexPreExecutionEvent(
-          sparkSession, carbonTable.getAbsoluteTableIdentifier, indexNames)
-      OperationListenerBus.getInstance().fireEvent(buildIndexPreExecutionEvent,
-        indexOperationContext)
-    }
+    val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
+      model,
+      "",
+      segmentPath,
+      options.asJava,
+      options.asJava,
+      false,
+      false,
+      None,
+      operationContext)
 
     val newLoadMetaEntry = new LoadMetadataDetails
     model.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime)
@@ -344,17 +336,12 @@ case class CarbonAddLoadCommand(
       }
     }
     viewManager.setStatus(viewSchemas, MVStatus.DISABLED)
-    val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
-      new LoadTablePostExecutionEvent(
-        carbonTable.getCarbonTableIdentifier,
-        model)
-    OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
-    if (tableIndexes.size() > 0) {
-      val buildIndexPostExecutionEvent = BuildIndexPostExecutionEvent(sparkSession,
-        carbonTable.getAbsoluteTableIdentifier, null, Seq(model.getSegmentId), false)
-      OperationListenerBus.getInstance()
-        .fireEvent(buildIndexPostExecutionEvent, indexOperationContext)
-    }
+    CommonLoadUtils.firePostLoadEvents(sparkSession,
+      model,
+      tableIndexes,
+      indexOperationContext,
+      carbonTable,
+      operationContext)
   }
 
   // extract partition column and value, for example, given
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 3eeaa54..8d6e8e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -47,7 +48,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.rdd.StreamHandoffRDD
 import org.apache.carbondata.spark.util.CommonUtil
@@ -126,16 +126,16 @@ class CarbonAppendableStreamSink(
 
       // fire pre event on every batch add
       // in case of streaming options and optionsFinal can be same
-      val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
-        carbonTable.getCarbonTableIdentifier,
+      val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
         carbonLoadModel,
+        "",
         carbonLoadModel.getFactFilePath,
-        false,
         parameters.asJava,
         parameters.asJava,
-        false
-      )
-      OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
+        false,
+        false,
+        None,
+        operationContext)
       checkOrHandOffSegment()
 
       // committer will record how this spark job commit its output
@@ -162,11 +162,12 @@ class CarbonAppendableStreamSink(
         carbonLoadModel,
         msrDataTypes)
       // fire post event on every batch add
-      val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
-        carbonTable.getCarbonTableIdentifier,
-        carbonLoadModel
-      )
-      OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
+      CommonLoadUtils.firePostLoadEvents(sparkSession,
+        carbonLoadModel,
+        tableIndexes,
+        indexOperationContext,
+        carbonTable,
+        operationContext)
 
       statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis())
       CarbonAppendableStreamSink.LOGGER.info(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 924cd62..4958950 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -51,12 +51,6 @@ public class LoadEvents {
       this.isOverWriteTable = isOverWriteTable;
     }
 
-    public LoadTablePreExecutionEvent(CarbonTableIdentifier carbonTableIdentifier,
-        CarbonLoadModel carbonLoadModel) {
-      this.carbonTableIdentifier = carbonTableIdentifier;
-      this.carbonLoadModel = carbonLoadModel;
-    }
-
     public CarbonTableIdentifier getCarbonTableIdentifier() {
       return carbonTableIdentifier;
     }