You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/21 13:59:50 UTC
[carbondata] branch master updated: [CARBONDATA-3907]Refactor to
use CommonLoadUtils API's firePreLoadEvents and firePostLoadEvents to
trigger Load pre and post events
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 797719c [CARBONDATA-3907]Refactor to use CommonLoadUtils API's firePreLoadEvents and firePostLoadEvents to trigger Load pre and post events
797719c is described below
commit 797719ce929266aca1cc545a35e9cc42a5994358
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Thu Jul 16 16:37:06 2020 +0530
[CARBONDATA-3907]Refactor to use CommonLoadUtils API's firePreLoadEvents and firePostLoadEvents to
trigger Load pre and post events
Why is this PR needed?
Currently we have 2 different ways of firing LoadTablePreExecutionEvent and LoadTablePostExecutionEvent.
We can reuse firePreLoadEvents and firePostLoadEvents methods from CommonLoadUtils to trigger
LoadTablePreExecutionEvent and LoadTablePostExecutionEvent respectively in alter table add segment
flow as well.
What changes were proposed in this PR?
Reuse firePreLoadEvents and firePostLoadEvents methods from CommonLoadUtils to trigger
LoadTablePreExecutionEvent and LoadTablePostExecutionEvent respectively in alter table add segment flow.
This closes #3850
---
.../carbondata/streaming/StreamSinkFactory.scala | 25 ++++++------
.../command/management/CarbonAddLoadCommand.scala | 45 ++++++++--------------
.../streaming/CarbonAppendableStreamSink.scala | 25 ++++++------
.../processing/loading/events/LoadEvents.java | 6 ---
4 files changed, 42 insertions(+), 59 deletions(-)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 0502d57..e76fad4 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.CarbonBadRecordUtil
import org.apache.carbondata.streaming.segment.StreamSegment
@@ -94,16 +94,16 @@ object StreamSinkFactory {
// fire pre event before streamin is started
// in case of streaming options and optionsFinal can be same
val operationContext = new OperationContext
- val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
- carbonTable.getCarbonTableIdentifier,
+ val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
carbonLoadModel,
+ "",
carbonLoadModel.getFactFilePath,
- false,
parameters.asJava,
parameters.asJava,
- false
- )
- OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
+ false,
+ false,
+ None,
+ operationContext)
// prepare the stream segment
val segmentId = getStreamSegmentId(carbonTable)
carbonLoadModel.setSegmentId(segmentId)
@@ -118,11 +118,12 @@ object StreamSinkFactory {
operationContext)
// fire post event before streamin is started
- val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
- carbonTable.getCarbonTableIdentifier,
- carbonLoadModel
- )
- OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
+ CommonLoadUtils.firePostLoadEvents(sparkSession,
+ carbonLoadModel,
+ tableIndexes,
+ indexOperationContext,
+ carbonTable,
+ operationContext)
carbonAppendableStreamSink
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index facbb58..a98cec5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -228,24 +228,16 @@ case class CarbonAddLoadCommand(
model.setTableName(carbonTable.getTableName)
val operationContext = new OperationContext
operationContext.setProperty("isLoadOrCompaction", false)
- val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
- new LoadTablePreExecutionEvent(
- carbonTable.getCarbonTableIdentifier,
- model)
- operationContext.setProperty("isOverwrite", false)
- OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
- // Add pre event listener for index indexSchema
- val tableIndexes = IndexStoreManager.getInstance().getAllCGAndFGIndexes(carbonTable)
- val indexOperationContext = new OperationContext()
- if (tableIndexes.size() > 0) {
- val indexNames: mutable.Buffer[String] =
- tableIndexes.asScala.map(index => index.getIndexSchema.getIndexName)
- val buildIndexPreExecutionEvent: BuildIndexPreExecutionEvent =
- BuildIndexPreExecutionEvent(
- sparkSession, carbonTable.getAbsoluteTableIdentifier, indexNames)
- OperationListenerBus.getInstance().fireEvent(buildIndexPreExecutionEvent,
- indexOperationContext)
- }
+ val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
+ model,
+ "",
+ segmentPath,
+ options.asJava,
+ options.asJava,
+ false,
+ false,
+ None,
+ operationContext)
val newLoadMetaEntry = new LoadMetadataDetails
model.setFactTimeStamp(CarbonUpdateUtil.readCurrentTime)
@@ -344,17 +336,12 @@ case class CarbonAddLoadCommand(
}
}
viewManager.setStatus(viewSchemas, MVStatus.DISABLED)
- val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
- new LoadTablePostExecutionEvent(
- carbonTable.getCarbonTableIdentifier,
- model)
- OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
- if (tableIndexes.size() > 0) {
- val buildIndexPostExecutionEvent = BuildIndexPostExecutionEvent(sparkSession,
- carbonTable.getAbsoluteTableIdentifier, null, Seq(model.getSegmentId), false)
- OperationListenerBus.getInstance()
- .fireEvent(buildIndexPostExecutionEvent, indexOperationContext)
- }
+ CommonLoadUtils.firePostLoadEvents(sparkSession,
+ model,
+ tableIndexes,
+ indexOperationContext,
+ carbonTable,
+ operationContext)
}
// extract partition column and value, for example, given
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 3eeaa54..8d6e8e1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
+import org.apache.spark.sql.execution.command.management.CommonLoadUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -47,7 +48,6 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.StreamHandoffRDD
import org.apache.carbondata.spark.util.CommonUtil
@@ -126,16 +126,16 @@ class CarbonAppendableStreamSink(
// fire pre event on every batch add
// in case of streaming options and optionsFinal can be same
- val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
- carbonTable.getCarbonTableIdentifier,
+ val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(sparkSession,
carbonLoadModel,
+ "",
carbonLoadModel.getFactFilePath,
- false,
parameters.asJava,
parameters.asJava,
- false
- )
- OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
+ false,
+ false,
+ None,
+ operationContext)
checkOrHandOffSegment()
// committer will record how this spark job commit its output
@@ -162,11 +162,12 @@ class CarbonAppendableStreamSink(
carbonLoadModel,
msrDataTypes)
// fire post event on every batch add
- val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
- carbonTable.getCarbonTableIdentifier,
- carbonLoadModel
- )
- OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
+ CommonLoadUtils.firePostLoadEvents(sparkSession,
+ carbonLoadModel,
+ tableIndexes,
+ indexOperationContext,
+ carbonTable,
+ operationContext)
statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis())
CarbonAppendableStreamSink.LOGGER.info(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 924cd62..4958950 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -51,12 +51,6 @@ public class LoadEvents {
this.isOverWriteTable = isOverWriteTable;
}
- public LoadTablePreExecutionEvent(CarbonTableIdentifier carbonTableIdentifier,
- CarbonLoadModel carbonLoadModel) {
- this.carbonTableIdentifier = carbonTableIdentifier;
- this.carbonLoadModel = carbonLoadModel;
- }
-
public CarbonTableIdentifier getCarbonTableIdentifier() {
return carbonTableIdentifier;
}