You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/06/21 12:04:19 UTC

carbondata git commit: [CARBONDATA-2623][DataMap] Add DataMap Pre and Pevent listener

Repository: carbondata
Updated Branches:
  refs/heads/master 55f4bc6c8 -> b3f782062


[CARBONDATA-2623][DataMap] Add DataMap Pre and Pevent listener

Added Pre and Post Execution Events for index datamap

This closes #2389


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b3f78206
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b3f78206
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b3f78206

Branch: refs/heads/master
Commit: b3f7820623d4bc9ab4408beb8ad708ba9b19b899
Parents: 55f4bc6
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Wed Jun 20 19:52:51 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Jun 21 17:37:48 2018 +0530

----------------------------------------------------------------------
 .../carbondata/events/DataMapEvents.scala       | 68 ++++++++++++++++++++
 .../org/apache/carbondata/events/Events.scala   | 18 +++++-
 .../datamap/IndexDataMapRebuildRDD.scala        | 11 +++-
 .../spark/rdd/CarbonTableCompactor.scala        | 23 ++++++-
 .../datamap/CarbonCreateDataMapCommand.scala    | 22 +++++++
 .../datamap/CarbonDataMapRebuildCommand.scala   | 12 ++++
 .../datamap/CarbonDropDataMapCommand.scala      | 11 ++++
 .../management/CarbonLoadDataCommand.scala      | 21 +++++-
 8 files changed, 181 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
new file mode 100644
index 0000000..8fb374f
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DataMapEvents.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+
+/**
+ * For handling operation's after finish of index creation over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class CreateDataMapPostExecutionEvent(sparkSession: SparkSession,
+    storePath: String) extends Event with CreateDataMapEventsInfo
+
+/**
+ * For handling operation's before start of update index datmap status over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class UpdateDataMapPreExecutionEvent(sparkSession: SparkSession,
+    storePath: String) extends Event with CreateDataMapEventsInfo
+
+/**
+ * For handling operation's after finish of  update index datmap status over table with index
+ * datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class UpdateDataMapPostExecutionEvent(sparkSession: SparkSession,
+    storePath: String) extends Event with CreateDataMapEventsInfo
+
+/**
+ * For handling operation's before start of index build over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class BuildDataMapPreExecutionEvent(sparkSession: SparkSession,
+    identifier: AbsoluteTableIdentifier, dataMapNames: scala.collection.mutable.Seq[String])
+  extends Event with BuildDataMapEventsInfo
+
+/**
+ * For handling operation's after finish of index build over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class BuildDataMapPostExecutionEvent(sparkSession: SparkSession,
+    identifier: AbsoluteTableIdentifier)
+  extends Event with TableEventInfo
+
+/**
+ * For handling operation's before start of index creation over table with index datamap
+ * example: bloom datamap, Lucene datamap
+ */
+case class CreateDataMapPreExecutionEvent(sparkSession: SparkSession,
+    storePath: String) extends Event with CreateDataMapEventsInfo
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index da62e02..1830a35 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
 
-import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -168,3 +167,20 @@ trait DeleteFromTableEventInfo {
 trait SessionEventInfo {
   val sparkSession: SparkSession
 }
+
+/**
+ * Event info for create datamap
+ */
+trait CreateDataMapEventsInfo {
+  val sparkSession: SparkSession
+  val storePath: String
+}
+
+/**
+ * Event info for build datamap
+ */
+trait BuildDataMapEventsInfo {
+  val sparkSession: SparkSession
+  val identifier: AbsoluteTableIdentifier
+  val dataMapNames: scala.collection.mutable.Seq[String]
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index cde6201..d064306 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -22,6 +22,7 @@ import java.text.SimpleDateFormat
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.TaskMetricsMap
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection, CarbonRecordReader}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
@@ -67,13 +69,20 @@ object IndexDataMapRebuildRDD {
     val validAndInvalidSegments = segmentStatusManager.getValidAndInvalidSegments()
     val validSegments = validAndInvalidSegments.getValidSegments
     val indexedCarbonColumns = carbonTable.getIndexedColumns(schema)
-
+    val operationContext = new OperationContext()
+    val buildDataMapPreExecutionEvent = new BuildDataMapPreExecutionEvent(sparkSession,
+      tableIdentifier,
+      mutable.Seq[String](schema.getDataMapName))
+    OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, operationContext)
     // loop all segments to rebuild DataMap
     validSegments.asScala.foreach { segment =>
       // if lucene datamap folder is exists, not require to build lucene datamap again
       refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
         indexedCarbonColumns, segment.getSegmentNo);
     }
+    val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
+      tableIdentifier)
+    OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
   }
 
   private def refreshOneSegment(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 7605b9d..fcc649e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -22,12 +22,13 @@ import java.util.List
 import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.readcommitter.{ReadCommittedScope, TableStatusReadCommittedScope}
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
@@ -156,7 +157,18 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         carbonMergerMapping,
         mergedLoadName)
     OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
-
+    // Add pre event listener for index datamap
+    val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
+    val dataMapOperationContext = new OperationContext()
+    if (null != tableDataMaps) {
+      val dataMapNames: mutable.Buffer[String] =
+        tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
+      val dataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+        new BuildDataMapPreExecutionEvent(sqlContext.sparkSession,
+        carbonTable.getAbsoluteTableIdentifier, dataMapNames)
+      OperationListenerBus.getInstance().fireEvent(dataMapPreExecutionEvent,
+        dataMapOperationContext)
+    }
     var execInstance = "1"
     // in case of non dynamic executor allocation, number of executors are fixed.
     if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
@@ -272,6 +284,13 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         mergedLoadName)
       OperationListenerBus.getInstance()
         .fireEvent(compactionLoadStatusPostEvent, operationContext)
+      if (null != tableDataMaps) {
+        val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent =
+          new BuildDataMapPostExecutionEvent(sqlContext.sparkSession,
+            carbonTable.getAbsoluteTableIdentifier)
+        OperationListenerBus.getInstance()
+          .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+      }
       val commitDone = operationContext.getProperty("commitComplete")
       val commitComplete = if (null != commitDone) {
         commitDone.toString.toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 1ae872a..27e1720 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -31,7 +31,9 @@ import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.datamap.{DataMapClassProvider, DataMapProperty}
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
+import org.apache.carbondata.events._
 
 /**
  * Below command class will be used to create datamap on table
@@ -108,8 +110,18 @@ case class CarbonCreateDataMapCommand(
               "column '%s' already has datamap created", column.getColName))
           }
         }
+        val operationContext: OperationContext = new OperationContext()
+        val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+        val createDataMapPreExecutionEvent: CreateDataMapPreExecutionEvent =
+          new CreateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+        OperationListenerBus.getInstance().fireEvent(createDataMapPreExecutionEvent,
+          operationContext)
         dataMapProvider.initMeta(queryString.orNull)
         DataMapStatusManager.disableDataMap(dataMapName)
+        val createDataMapPostExecutionEvent: CreateDataMapPostExecutionEvent =
+          new CreateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+        OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
+          operationContext)
       case _ =>
         if (deferredRebuild) {
           throw new MalformedDataMapCommandException(
@@ -128,7 +140,17 @@ case class CarbonCreateDataMapCommand(
       if (mainTable != null && !deferredRebuild) {
         dataMapProvider.rebuild()
         if (dataMapSchema.isIndexDataMap) {
+          val operationContext: OperationContext = new OperationContext()
+          val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+          val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
+            new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+          OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
+            operationContext)
           DataMapStatusManager.enableDataMap(dataMapName)
+          val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
+            new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+          OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
+            operationContext)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
index 6493c83..beadc7e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -23,7 +23,9 @@ import org.apache.spark.sql.execution.command.DataCommand
 
 import org.apache.carbondata.core.datamap.{DataMapRegistry, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapRebuildRDD}
+import org.apache.carbondata.events.{UpdateDataMapPostExecutionEvent, _}
 
 /**
  * Rebuild the datamaps through sync with main table data. After sync with parent table's it enables
@@ -49,7 +51,17 @@ case class CarbonDataMapRebuildCommand(
     provider.rebuild()
 
     // After rebuild successfully enable the datamap.
+    val operationContext: OperationContext = new OperationContext()
+    val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+    val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
+      new UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+    OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
+      operationContext)
     DataMapStatusManager.enableDataMap(dataMapName)
+    val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
+      new UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+    OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
+      operationContext)
     Seq.empty
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index f1ed5d1..722119e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.datamap.{DataMapManager, IndexDataMapProvider}
 import org.apache.carbondata.events._
 
@@ -197,7 +198,17 @@ case class CarbonDropDataMapCommand(
       if (dataMapSchema != null) {
         dataMapProvider =
           DataMapManager.get.getDataMapProvider(mainTable, dataMapSchema, sparkSession)
+        val operationContext: OperationContext = new OperationContext()
+        val systemFolderLocation: String = CarbonProperties.getInstance().getSystemFolderLocation
+        val updateDataMapPreExecutionEvent: UpdateDataMapPreExecutionEvent =
+          UpdateDataMapPreExecutionEvent(sparkSession, systemFolderLocation)
+        OperationListenerBus.getInstance().fireEvent(updateDataMapPreExecutionEvent,
+          operationContext)
         DataMapStatusManager.dropDataMap(dataMapSchema.getDataMapName)
+        val updateDataMapPostExecutionEvent: UpdateDataMapPostExecutionEvent =
+          UpdateDataMapPostExecutionEvent(sparkSession, systemFolderLocation)
+        OperationListenerBus.getInstance().fireEvent(updateDataMapPostExecutionEvent,
+          operationContext)
         // if it is indexDataMap provider like lucene, then call cleanData, which will launch a job
         // to clear datamap from memory(clears from segmentMap and cache), This is called before
         // deleting the datamap schemas from _System folder

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b3f78206/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 69db3ea..38bdbcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -62,7 +62,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil, ObjectSerializationUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
@@ -233,6 +233,18 @@ case class CarbonLoadDataCommand(
             isOverwriteTable)
         operationContext.setProperty("isOverwrite", isOverwriteTable)
         OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
+        // Add pre event listener for index datamap
+        val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
+        val dataMapOperationContext = new OperationContext()
+        if (null != tableDataMaps) {
+          val dataMapNames: mutable.Buffer[String] =
+            tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
+          val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+            new BuildDataMapPreExecutionEvent(sparkSession,
+              table.getAbsoluteTableIdentifier, dataMapNames)
+          OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
+            dataMapOperationContext)
+        }
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry for new load.
@@ -300,6 +312,13 @@ case class CarbonLoadDataCommand(
             table.getCarbonTableIdentifier,
             carbonLoadModel)
         OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
+        if (null != tableDataMaps) {
+          val buildDataMapPostExecutionEvent: BuildDataMapPostExecutionEvent =
+            BuildDataMapPostExecutionEvent(sparkSession, table.getAbsoluteTableIdentifier)
+          OperationListenerBus.getInstance()
+            .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+        }
+
       } catch {
         case CausedBy(ex: NoRetryException) =>
           // update the load entry in table status file for changing the status to marked for delete