You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:51 UTC
[50/50] [abbrv] carbondata git commit: [CARBONDATA-2623][DataMap] Add
DataMap Pre and Pevent listener
[CARBONDATA-2623][DataMap] Add DataMap Pre and Pevent listener
Added Pre and Post Execution Events for index datamap
This closes #2389
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b3f78206
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b3f78206
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b3f78206
Branch: refs/heads/carbonstore
Commit: b3f7820623d4bc9ab4408beb8ad708ba9b19b899
Parents: 55f4bc6
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Wed Jun 20 19:52:51 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Jun 21 17:37:48 2018 +0530
----------------------------------------------------------------------
.../carbondata/events/DataMapEvents.scala | 68 ++++++++++++++++++++
.../org/apache/carbondata/events/Events.scala | 18 +++++-
.../datamap/IndexDataMapRebuildRDD.scala | 11 +++-
.../spark/rdd/CarbonTableCompactor.scala | 23 ++++++-
.../datamap/CarbonCreateDataMapCommand.scala | 22 +++++++
.../datamap/CarbonDataMapRebuildCommand.scala | 12 ++++
.../datamap/CarbonDropDataMapCommand.scala | 11 ++++
.../management/CarbonLoadDataCommand.scala | 21 +++++-
8 files changed, 181 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
new file mode 100644
index 0000000..8fb374f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
+/**
+ * For handling operation's after finish of index creation over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class CreateDataMapPostExecutionEvent(sparkSession: SparkSession,
+ storePath: String) extends Event with CreateDataMapEventsInfo
+
+/**
+ * For handling operation's before start of update index datmap status over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class UpdateDataMapPreExecutionEvent(sparkSession: SparkSession,
+ storePath: String) extends Event with CreateDataMapEventsInfo
+
+/**
+ * For handling operation's after finish of update index datmap status over table with index
+ * datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class UpdateDataMapPostExecutionEvent(sparkSession: SparkSession,
+ storePath: String) extends Event with CreateDataMapEventsInfo
+
+/**
+ * For handling operation's before start of index build over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession,
+ identifier: AbsoluteTableIdentifier, dataMapNames: scala.collection.mutable.Seq[String])
+ extends Event with BuildDataMapEventsInfo
+
+/**
+ * For handling operation's after finish of index build over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession,
+ identifier: AbsoluteTableIdentifier)
+ extends Event with TableEventInfo
+
+/**
+ * For handling operation's before start of index creation over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class CreateDataMapPreExecutionEvent(sparkSession: SparkSession,
+ storePath: String) extends Event with CreateDataMapEventsInfo
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index da62e02..1830a35 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
-import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -168,3 +167,20 @@ trait DeleteFromTableEventInfo {
trait SessionEventInfo {
val sparkSession: SparkSession
}
+
+/**
+ * Event info for create datamap
+ */
+trait CreateDataMapEventsInfo {
+ val sparkSession: SparkSession
+ val storePath: String
+}
+
+/**
+ * Event info for build datamap
+ */
+trait BuildDataMapEventsInfo {
+ val sparkSession: SparkSession
+ val identifier: AbsoluteTableIdentifier
+ val dataMapNames: scala.collection.mutable.Seq[String]
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index cde6201..d064306 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -22,6 +22,7 @@ import java.text.SimpleDateFormat
import java.util
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.TaskMetricsMap
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
@@ -67,13 +69,20 @@ object IndexDataMapRebuildRDD {
val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
val validSegments = validAndInvalidSegments.getValidSegments
val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
-
+ val operationContext = new OperationContext()
+ val buildDataMapPreExecutionEvent = new BuildDataMapPreExecutionEvent(sparkSession,
+ tableIdentifier,
+ mutable.Seq[String](schema.getDataMapName))
+ OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, operationContext)
// loop all segments to rebuild DataMap
validSegments.asScala.foreach { segment =>
// if lucene datamap folder is exists, not require to build lucene datamap again
refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
indexedCarbonColumns, segment.getSegmentNo);
}
+ val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
+ tableIdentifier)
+ OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
}
private def refreshOneSegment(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 7605b9d..fcc649e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -22,12 +22,13 @@ import java.util.List
import java.util.concurrent.ExecutorService
import scala.collection.JavaConverters._
+import scala.collection.mutable
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, TableStatusReadCommittedScope}
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
@@ -156,7 +157,18 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
carbonMergerMapping,
mergedLoadName)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
-
+ // Add pre event listener for index datamap
+ val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
+ val dataMapOperationContext = new OperationContext()
+ if (null != tableDataMaps) {
+ val dataMapNames: mutable.Buffer[String] =
+ tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
+ val dataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+ new BuildDataMapPreExecutionEvent(sqlContext.sparkSession,
+ carbonTable.getAbsoluteTableIdentifier, dataMapNames)
+ OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent,
+ dataMapOperationContext)
+ }
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
@@ -272,6 +284,13 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
mergedLoadName)
OperationListenerBus.getInstance()
.fireEvent(compactionLoadStatusPostEvent, operationContext)
+ if (null != tableDataMaps) {
+ val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent =
+ new BuildDataMapPostExecutionEvent(sqlContext.sparkSession,
+ carbonTable.getAbsoluteTableIdentifier)
+ OperationListenerBus.getInstance()
+ .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+ }
val commitDone = operationContext.getProperty("commitComplete")
val commitComplete = if (null != commitDone) {
commitDone.toString.toBoolean
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 1ae872a..27e1720 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -31,7 +31,9 @@ import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
+import org.apache.carbondata.events._
/**
* Below command class will be used to create datamap on table
@@ -108,8 +110,18 @@ case class CarbonCreateDataMapCommand(
"column '%s' already has datamap created", column.getColName))
}
}
+ val operationContext: OperationContext = new OperationContext()
+ val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+ val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent =
+ new CreateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent,
+ operationContext)
dataMapProvider.initMeta(queryString.orNull)
DataMapStatusManager.disableDataMap(dataMapName)
+ val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent =
+ new CreateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
+ operationContext)
case _ =>
if (deferredRebuild) {
throw new MalformedDataMapCommandException(
@@ -128,7 +140,17 @@ case class CarbonCreateDataMapCommand(
if (mainTable != null && !deferredRebuild) {
dataMapProvider.rebuild()
if (dataMapSchema.isIndexDataMap) {
+ val operationContext: OperationContext = new OperationContext()
+ val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+ val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
+ new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
+ operationContext)
DataMapStatusManager.enableDataMap(dataMapName)
+ val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
+ new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
+ operationContext)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
index 6493c83..beadc7e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -23,7 +23,9 @@ import org.apache.spark.sql.execution.command.DataCommand
import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD}
+import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _}
/**
* Rebuild the datamaps through sync with main table data. After sync with parent table's it enables
@@ -49,7 +51,17 @@ case class CarbonDataMapRebuildCommand(
provider.rebuild()
// After rebuild successfully enable the datamap.
+ val operationContext: OperationContext = new OperationContext()
+ val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+ val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
+ new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
+ operationContext)
DataMapStatusManager.enableDataMap(dataMapName)
+ val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
+ new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
+ operationContext)
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index f1ed5d1..722119e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
import org.apache.carbondata.events._
@@ -197,7 +198,17 @@ case class CarbonDropDataMapCommand(
if (dataMapSchema != null) {
dataMapProvider =
DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
+ val operationContext: OperationContext = new OperationContext()
+ val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+ val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
+ UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
+ operationContext)
DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
+ val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
+ UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+ OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
+ operationContext)
// if it is indexDataMap provider like lucene, then call cleanData, which will launch a job
// to clear datamap from memory(clears from segmentMap and cache), This is called before
// deleting the datamap schemas from _System folder
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 69db3ea..38bdbcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -62,7 +62,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
@@ -233,6 +233,18 @@ case class CarbonLoadDataCommand(
isOverwriteTable)
operationContext.setProperty("isOverwrite", isOverwriteTable)
OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
+ // Add pre event listener for index datamap
+ val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
+ val dataMapOperationContext = new OperationContext()
+ if (null != tableDataMaps) {
+ val dataMapNames: mutable.Buffer[String] =
+ tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
+ val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+ new BuildDataMapPreExecutionEvent(sparkSession,
+ table.getAbsoluteTableIdentifier, dataMapNames)
+ OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
+ dataMapOperationContext)
+ }
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
// Clean up the old invalid segment data before creating a new entry for new load.
@@ -300,6 +312,13 @@ case class CarbonLoadDataCommand(
table.getCarbonTableIdentifier,
carbonLoadModel)
OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
+ if (null != tableDataMaps) {
+ val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent =
+ BuildDataMapPostExecutionEvent(sparkSession, table.getAbsoluteTableIdentifier)
+ OperationListenerBus.getInstance()
+ .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+ }
+
} catch {
case CausedBy(ex: NoRetryException) =>
// update the load entry in table status file for changing the status to marked for delete