You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/29 05:49:07 UTC

carbondata git commit: [CARBONDATA-1592] Added event listeners

Repository: carbondata
Updated Branches:
  refs/heads/master 7e124f4f1 -> 9e0fd5ffe


[CARBONDATA-1592] Added event listeners

Description : Added Event Listener interface to Carbondata. This will allow extending the current functionality of various commands to perform various other operations.

This closes #1562


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e0fd5ff
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e0fd5ff
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e0fd5ff

Branch: refs/heads/master
Commit: 9e0fd5ffe3dfad001a95ff592644a34c448d4843
Parents: 7e124f4
Author: Manohar <ma...@gmail.com>
Authored: Fri Nov 24 15:11:59 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 29 13:48:57 2017 +0800

----------------------------------------------------------------------
 .../apache/carbondata/core/util/CarbonUtil.java |  6 ---
 .../carbondata/events/AlterTableEvents.scala    | 27 ++++++++++--
 .../events/CreateDatabaseEvents.scala           | 29 +++++++++++++
 .../carbondata/events/CreateTableEvents.scala   | 43 ++++++++++++++++++++
 .../apache/carbondata/events/IUDEvents.scala    | 12 ++++--
 .../apache/carbondata/events/LoadEvents.scala   |  6 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  4 +-
 .../apache/carbondata/spark/rdd/Compactor.scala | 12 +++++-
 .../scala/org/apache/spark/util/FileUtils.scala | 13 ++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  7 ----
 .../spark/sql/CarbonDictionaryDecoder.scala     |  2 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  7 +++-
 .../execution/BatchedDataSourceScanExec.scala   |  4 +-
 .../command/CarbonCreateTableCommand.scala      | 13 +++++-
 .../command/management/CleanFilesCommand.scala  | 10 +++++
 .../command/management/LoadTableCommand.scala   | 15 +++++++
 .../mutation/ProjectForDeleteCommand.scala      |  4 +-
 .../mutation/ProjectForUpdateCommand.scala      |  5 +--
 .../CarbonAlterTableAddColumnCommand.scala      | 11 +++--
 .../CarbonAlterTableDataTypeChangeCommand.scala | 14 +++++--
 .../strategy/CarbonLateDecodeStrategy.scala     |  2 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  3 ++
 22 files changed, 204 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 02fe054..1175cf0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2026,12 +2026,6 @@ public final class CarbonUtil {
     }
   }
 
-  public static void createDatabaseDirectory(String dbName, String storePath) throws IOException {
-    String databasePath = storePath + File.separator + dbName.toLowerCase();
-    FileFactory.FileType fileType = FileFactory.getFileType(databasePath);
-    FileFactory.mkdirs(databasePath, fileType);
-  }
-
   public static void dropDatabaseDirectory(String dbName, String storePath)
       throws IOException, InterruptedException {
     String databasePath = storePath + File.separator + dbName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 5a818fc..1a0c305 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -40,11 +40,21 @@ case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
  * @param carbonTable
  * @param alterTableDataTypeChangeModel
  */
-case class AlterTableDataTypeChangePreEvent(carbonTable: CarbonTable,
+case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
         alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
   extends Event with AlterTableDataTypeChangeEventInfo
 
 /**
+ * Class for handling clean up in case of any failure and abort the operation
+ *
+ * @param carbonTable
+ * @param alterTableDataTypeChangeModel
+ */
+case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
+  extends Event with AlterTableDataTypeChangeEventInfo
+
+/**
  *
  * @param carbonTable
  * @param alterTableDropColumnModel
@@ -82,7 +92,16 @@ case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
  * @param carbonTable
  * @param alterTableAddColumnsModel
  */
-case class AlterTableAddColumnPreEvent(carbonTable: CarbonTable,
+case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+    alterTableAddColumnsModel: AlterTableAddColumnsModel)
+  extends Event with AlterTableAddColumnEventInfo
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableAddColumnsModel
+ */
+case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
     alterTableAddColumnsModel: AlterTableAddColumnsModel)
   extends Event with AlterTableAddColumnEventInfo
 
@@ -118,7 +137,7 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
  * @param mergedLoadName
  * @param sQLContext
  */
-case class AlterTableCompactionPreEvent(carbonTable: CarbonTable,
+case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
     carbonLoadModel: CarbonLoadModel,
     mergedLoadName: String,
     sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
@@ -131,7 +150,7 @@ case class AlterTableCompactionPreEvent(carbonTable: CarbonTable,
  * @param mergedLoadName
  * @param sQLContext
  */
-case class AlterTableCompactionPostEvent(carbonTable: CarbonTable,
+case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
     carbonLoadModel: CarbonLoadModel,
     mergedLoadName: String,
     sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
new file mode 100644
index 0000000..dae22b1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+
+case class CreateDatabasePreExecutionEvent(databaseName: String) extends Event
+  with DatabaseEventInfo
+
+case class CreateDatabasePostExecutionEvent(databaseName: String, dataBasePath: String)
+  extends Event with DatabaseEventInfo
+
+case class CreateDatabaseAbortExecutionEvent(databaseName: String)
+  extends Event with DatabaseEventInfo
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
new file mode 100644
index 0000000..3b03e4d
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql._
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+
+/**
+ * Class for handling operations before start of a load process.
+ * Example usage: For validation purpose
+ */
+case class CreateTablePreExecutionEvent(sparkSession: SparkSession,
+    carbonTableIdentifier: CarbonTableIdentifier,
+    storePath: String) extends Event with TableEventInfo
+
+/**
+ * Class for handling operations after data load completion and before final
+ * commit of load operation. Example usage: For loading pre-aggregate tables
+ */
+case class CreateTablePostExecutionEvent(sparkSession: SparkSession,
+    carbonTableIdentifier: CarbonTableIdentifier) extends Event with TableEventInfo
+
+/**
+ * Class for handling clean up in case of any failure and abort the operation.
+ */
+case class CreateTableAbortExecutionEvent(sparkSession: SparkSession,
+    carbonTableIdentifier: CarbonTableIdentifier) extends Event with TableEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
index deebb65..1bd94d4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
@@ -16,20 +16,24 @@
  */
 package org.apache.carbondata.events
 
+import org.apache.spark.sql.SparkSession
+
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
 /**
  *
  * @param carbonTable
  */
-case class UpdateTablePreEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+case class UpdateTablePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
+  extends Event with UpdateTableEventInfo
 
 
 /**
  *
  * @param carbonTable
  */
-case class UpdateTablePostEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+case class UpdateTablePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
+  extends Event with UpdateTableEventInfo
 
 
 /**
@@ -42,7 +46,7 @@ case class UpdateTableAbortEvent(carbonTable: CarbonTable) extends Event with Up
  *
  * @param carbonTable
  */
-case class DeleteFromTablePreEvent(carbonTable: CarbonTable)
+case class DeleteFromTablePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
   extends Event with DeleteFromTableEventInfo
 
 
@@ -50,7 +54,7 @@ case class DeleteFromTablePreEvent(carbonTable: CarbonTable)
  *
  * @param carbonTable
  */
-case class DeleteFromTablePostEvent(carbonTable: CarbonTable)
+case class DeleteFromTablePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
   extends Event with DeleteFromTableEventInfo
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
index e3833d8..d87a1af 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -28,7 +28,11 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
  */
 case class LoadTablePreExecutionEvent(sparkSession: SparkSession,
     carbonTableIdentifier: CarbonTableIdentifier,
-    carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
+    carbonLoadModel: CarbonLoadModel,
+    factPath: String,
+    isDataFrameDefined: Boolean,
+    optionsFinal: scala.collection
+    .mutable.Map[String, String]) extends Event with LoadEventInfo
 
 /**
  * Class for handling operations after data load completion and before final

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7eaf95a..171b71b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -56,8 +56,8 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
  */
 class CarbonScanRDD(
     @transient sc: SparkContext,
-    columnProjection: CarbonProjection,
-    filterExpression: Expression,
+    val columnProjection: CarbonProjection,
+    var filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
     @transient serializedTableInfo: Array[Byte],
     @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index ba634b7..e41211a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -69,7 +69,11 @@ object Compactor {
     // trigger event for compaction
     val operationContext = new OperationContext
     val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
-      AlterTableCompactionPreEvent(carbonTable, carbonLoadModel, mergedLoadName, sc)
+      AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession,
+        carbonTable,
+        carbonLoadModel,
+        mergedLoadName,
+        sc)
     OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
 
     var execInstance = "1"
@@ -118,7 +122,11 @@ object Compactor {
 
       // trigger event for compaction
       val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
-        AlterTableCompactionPostEvent(carbonTable, carbonLoadModel, mergedLoadName, sc)
+        AlterTableCompactionPostEvent(compactionCallableModel.sqlContext.sparkSession,
+          carbonTable,
+          carbonLoadModel,
+          mergedLoadName,
+          sc)
       OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
 
       val endTime = System.nanoTime()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
index 681910d..b60d030 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import java.io.{File, IOException}
+
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -24,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{CreateDatabasePostExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.exception.DataLoadingException
 
 object FileUtils {
@@ -102,4 +105,14 @@ object FileUtils {
     }
   }
 
+  def createDatabaseDirectory(dbName: String, storePath: String) {
+    val databasePath: String = storePath + File.separator + dbName.toLowerCase
+    val fileType = FileFactory.getFileType(databasePath)
+    FileFactory.mkdirs(databasePath, fileType)
+    val operationContext = new OperationContext
+    val createDatabasePostExecutionEvent = new CreateDatabasePostExecutionEvent(dbName,
+      databasePath)
+    OperationListenerBus.getInstance.fireEvent(createDatabasePostExecutionEvent, operationContext)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 351d765..145068f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -496,13 +496,6 @@ object CarbonDataRDDFactory {
         throw new Exception("No Data to load")
       }
       writeDictionary(carbonLoadModel, result, writeAll = false)
-      // Register a handler here for executing tasks required before committing
-      // the load operation to a table status file
-      val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
-      LoadTablePostExecutionEvent(sqlContext.sparkSession,
-        carbonTable.getCarbonTableIdentifier,
-        carbonLoadModel)
-      OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
       val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
       if (!done) {
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 9d88c4c..1a336c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -461,7 +461,7 @@ class CarbonDecoderRDD(
     relations: Seq[CarbonDecoderRelation],
     profile: CarbonProfile,
     aliasMap: CarbonAliasDecoderRelation,
-    prev: RDD[InternalRow],
+    val prev: RDD[InternalRow],
     output: Seq[Attribute],
     serializedTableInfo: Array[Byte])
   extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index acef2e1..592e2c9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -26,11 +26,13 @@ import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, Carbo
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util._
 import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationListenerBus}
+import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
+
 /**
  * Carbon Environment for unified context
  */
@@ -80,9 +82,10 @@ class CarbonEnv {
           }
           LOGGER.info(s"carbon env initial: $storePath")
           // trigger event for CarbonEnv init
+          val operationContext = new OperationContext
           val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
             CarbonEnvInitPreEvent(sparkSession, storePath)
-          OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent)
+          OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
 
           CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
index f08c111..3c3efaa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.DataType
@@ -35,7 +36,8 @@ case class BatchedDataSourceScanExec(
     @transient relation: BaseRelation,
     override val outputPartitioning: Partitioning,
     override val metadata: Map[String, String],
-    override val metastoreTableIdentifier: Option[TableIdentifier])
+    override val metastoreTableIdentifier: Option[TableIdentifier],
+    @transient logicalRelation: LogicalRelation)
   extends DataSourceScanExec with CodegenSupport {
 
   override lazy val metrics =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index 06f3645..f9b8b9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
 
 case class CarbonCreateTableCommand(
     cm: TableModel,
@@ -78,6 +79,12 @@ case class CarbonCreateTableCommand(
       }
     } else {
       val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName)
+      val operationContext = new OperationContext
+      val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
+        new CreateTablePreExecutionEvent(sparkSession,
+          tableIdentifier.getCarbonTableIdentifier,
+          tablePath)
+      OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
       // Add Database to catalog and persist
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
@@ -93,7 +100,7 @@ case class CarbonCreateTableCommand(
                |(${ fields.map(f => f.rawSchema).mkString(",") })
                |USING org.apache.spark.sql.CarbonSource""".stripMargin +
             s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
-            s""""$tablePath"$carbonSchemaString) """)
+            s""""$tablePath", path "$tablePath" $carbonSchemaString) """.stripMargin)
         } catch {
           case e: AnalysisException => throw e
           case e: Exception =>
@@ -107,7 +114,9 @@ case class CarbonCreateTableCommand(
             CarbonException.analysisException(msg)
         }
       }
-
+      val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
+        new CreateTablePostExecutionEvent(sparkSession, tableIdentifier.getCarbonTableIdentifier)
+      OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
       LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
     }
     Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 7625579..d6604d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -45,6 +45,12 @@ case class CleanFilesCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
+    val operationContext = new OperationContext
+    val cleanFilesPreEvent: CleanFilesPreEvent =
+      CleanFilesPreEvent(carbonTable,
+        sparkSession)
+    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
     if (tableName.isDefined) {
       Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
       if (forceTableClean) {
@@ -55,6 +61,10 @@ case class CleanFilesCommand(
     } else {
       cleanGarbageDataInAllTables(sparkSession)
     }
+    val cleanFilesPostEvent: CleanFilesPostEvent =
+      CleanFilesPostEvent(carbonTable,
+        sparkSession)
+    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
     Seq.empty
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index d805227..0025011 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -139,6 +140,15 @@ case class LoadTableCommand(
         carbonLoadModel,
         hadoopConf
       )
+      val operationContext = new OperationContext
+      val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+        new LoadTablePreExecutionEvent(sparkSession,
+          null,
+          carbonLoadModel,
+          factPath,
+          dataFrame.isDefined,
+          optionsFinal)
+      OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
 
       try{
         // First system has to partition the data first and then call the load data
@@ -187,6 +197,11 @@ case class LoadTableCommand(
             partitionStatus,
             hadoopConf)
         }
+        val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+          new LoadTablePostExecutionEvent(sparkSession,
+            table.getCarbonTableIdentifier,
+            carbonLoadModel)
+        OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
       } catch {
         case CausedBy(ex: NoRetryException) =>
           LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index cf5bfd8..2b9caf7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -56,7 +56,7 @@ private[sql] case class ProjectForDeleteCommand(
     // trigger event for Delete from table
     val operationContext = new OperationContext
     val deleteFromTablePreEvent: DeleteFromTablePreEvent =
-      DeleteFromTablePreEvent(carbonTable)
+      DeleteFromTablePreEvent(sparkSession, carbonTable)
     OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
 
     val metadataLock = CarbonLockFactory
@@ -85,7 +85,7 @@ private[sql] case class ProjectForDeleteCommand(
 
         // trigger post event for Delete from table
         val deleteFromTablePostEvent: DeleteFromTablePostEvent =
-          DeleteFromTablePostEvent(carbonTable)
+          DeleteFromTablePostEvent(sparkSession, carbonTable)
         OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
       }
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index da62f27..341f368 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -62,9 +62,8 @@ private[sql] case class ProjectForUpdateCommand(
     // trigger event for Update table
     val operationContext = new OperationContext
     val updateTablePreEvent: UpdateTablePreEvent =
-      UpdateTablePreEvent(carbonTable)
+      UpdateTablePreEvent(sparkSession, carbonTable)
     OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
-
     val metadataLock = CarbonLockFactory
       .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.METADATA_LOCK)
@@ -117,7 +116,7 @@ private[sql] case class ProjectForUpdateCommand(
 
       // trigger event for Update table
       val updateTablePostEvent: UpdateTablePostEvent =
-        UpdateTablePostEvent(carbonTable)
+        UpdateTablePostEvent(sparkSession, carbonTable)
       OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
     } catch {
       case e: HorizontalCompactionException =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index ed82140..0a720da 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{AlterTableAddColumnPreEvent, OperationListenerBus}
+import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
 
@@ -59,9 +59,10 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       // up relation should be called after acquiring the lock
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      val alterTableAddColumnListener = AlterTableAddColumnPreEvent(carbonTable,
+      val operationContext = new OperationContext
+      val alterTableAddColumnListener = AlterTableAddColumnPreEvent(sparkSession, carbonTable,
         alterTableAddColumnsModel)
-      OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)
+      OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext)
       // get the latest carbon table and check for column existence
       // read the latest schema file
       val carbonTablePath = CarbonStorePath
@@ -93,6 +94,10 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         .updateSchemaInfo(carbonTable,
           schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
+      val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
+        new AlterTableAddColumnPostEvent(sparkSession,
+          carbonTable, alterTableAddColumnsModel)
+      OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
       LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index f63cf0b..6e2c83d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{AlterTableAddColumnPreEvent, AlterTableDataTypeChangePreEvent, OperationListenerBus}
+import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
 
@@ -52,9 +52,11 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
         .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-      val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(carbonTable,
-        alterTableDataTypeChangeModel)
-      OperationListenerBus.getInstance().fireEvent(alterTableDataTypeChangeListener)
+      val operationContext = new OperationContext
+      val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(sparkSession,
+        carbonTable, alterTableDataTypeChangeModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(alterTableDataTypeChangeListener, operationContext)
       val columnName = alterTableDataTypeChangeModel.columnName
       val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -96,6 +98,10 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
         .setTime_stamp(System.currentTimeMillis)
       AlterTableUtil.updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession)
+      val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
+        new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,
+          alterTableDataTypeChangeModel)
+      OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
       LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
     } catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index aecddcb..2c23c57 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -329,7 +329,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         relation.relation,
         getPartitioning(table.carbonTable, updateRequestedColumns),
         metadata,
-        relation.catalogTable.map(_.identifier))
+        relation.catalogTable.map(_.identifier), relation)
     } else {
       RowDataSourceScanExec(output,
         scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 6ea743d..bee762a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,9 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
+import org.apache.spark.util.FileUtils
 
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -78,6 +80,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
       _, child: LogicalPlan, overwrite, _) =>
         ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite)) :: Nil
       case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+        FileUtils.createDatabaseDirectory(dbName, CarbonProperties.getStorePath)
         ExecutedCommandExec(createDb) :: Nil
       case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
         ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil