You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/10 05:16:34 UTC
[13/15] carbondata git commit: [CARBONDATA-1592]Event listener added
[CARBONDATA-1592]Event listener added
This closes #1473
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4aa0f493
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4aa0f493
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4aa0f493
Branch: refs/heads/pre-aggregate
Commit: 4aa0f493038a050f61af7c5d6c3af22cd20fd56b
Parents: fd0bdf6
Author: rahulforallp <ra...@knoldus.in>
Authored: Tue Nov 7 11:50:15 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 10 10:19:46 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/datamap/TableDataMap.java | 15 ++-
.../core/datamap/dev/DataMapFactory.java | 4 +-
.../carbondata/core/events/ChangeEvent.java | 35 -----
.../carbondata/core/events/EventListener.java | 25 ----
.../blockletindex/BlockletDataMapFactory.java | 11 +-
.../org/apache/carbondata/events/Event.java | 31 +++++
.../carbondata/events/OperationContext.java | 41 ++++++
.../events/OperationEventListener.java | 31 +++++
.../carbondata/events/OperationListenerBus.java | 119 +++++++++++++++++
.../testsuite/datamap/DataMapWriterSuite.scala | 4 +-
.../carbondata/events/AlterTableEvents.scala | 132 +++++++++++++++++++
.../carbondata/events/CarbonInitEvents.scala | 38 ++++++
.../carbondata/events/CleanFilesEvents.scala | 46 +++++++
.../carbondata/events/DeleteSegmentEvents.scala | 77 +++++++++++
.../carbondata/events/DropTableEvents.scala | 57 ++++++++
.../org/apache/carbondata/events/Events.scala | 131 ++++++++++++++++++
.../apache/carbondata/events/IUDEvents.scala | 62 +++++++++
.../apache/carbondata/events/LoadEvents.scala | 46 +++++++
.../events/LookupRelationEvents.scala | 51 +++++++
.../apache/carbondata/spark/rdd/Compactor.scala | 14 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 9 ++
.../scala/org/apache/spark/sql/CarbonEnv.scala | 6 +
.../command/CarbonDropTableCommand.scala | 22 +++-
.../command/management/CleanFilesCommand.scala | 11 ++
.../management/DeleteLoadByIdCommand.scala | 15 +++
.../DeleteLoadByLoadDateCommand.scala | 15 +++
.../mutation/ProjectForDeleteCommand.scala | 14 +-
.../mutation/ProjectForUpdateCommand.scala | 13 ++
.../CarbonAlterTableDropColumnCommand.scala | 19 +++
.../schema/CarbonAlterTableRenameCommand.scala | 16 +++
.../spark/sql/hive/CarbonFileMetastore.scala | 9 ++
31 files changed, 1039 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 1cf1def..705a9fd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -22,19 +22,20 @@ import java.util.List;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.DataMapFactory;
-import org.apache.carbondata.core.events.ChangeEvent;
-import org.apache.carbondata.core.events.EventListener;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.events.Event;
+import org.apache.carbondata.events.OperationContext;
+import org.apache.carbondata.events.OperationEventListener;
/**
* DataMap at the table level, user can add any number of datamaps for one table. Depends
* on the filter condition it can prune the blocklets.
*/
-public final class TableDataMap implements EventListener {
+public final class TableDataMap implements OperationEventListener {
private AbsoluteTableIdentifier identifier;
@@ -132,10 +133,6 @@ public final class TableDataMap implements EventListener {
return detailedBlocklets;
}
- @Override public void fireEvent(ChangeEvent event) {
- dataMapFactory.fireEvent(event);
- }
-
/**
* Clear only the datamaps of the segments
* @param segmentIds
@@ -164,4 +161,8 @@ public final class TableDataMap implements EventListener {
public DataMapFactory getDataMapFactory() {
return dataMapFactory;
}
+
+ @Override public void onEvent(Event event, OperationContext opContext) {
+ dataMapFactory.fireEvent(event);
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index cf0519b..f5a7404 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.carbondata.core.datamap.DataMapDistributable;
import org.apache.carbondata.core.datamap.DataMapMeta;
-import org.apache.carbondata.core.events.ChangeEvent;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.events.Event;
/**
* Interface for datamap factory, it is responsible for creating the datamap.
@@ -59,7 +59,7 @@ public interface DataMapFactory {
*
* @param event
*/
- void fireEvent(ChangeEvent event);
+ void fireEvent(Event event);
/**
* Clears datamap of the segment
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/core/events/ChangeEvent.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/events/ChangeEvent.java b/core/src/main/java/org/apache/carbondata/core/events/ChangeEvent.java
deleted file mode 100644
index de02c64..0000000
--- a/core/src/main/java/org/apache/carbondata/core/events/ChangeEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.events;
-
-/**
- * Change event for any updates in store.
- */
-public interface ChangeEvent<T> {
-
- EventType getEventType();
-
- T getEventData();
-
- void setEventData(T data);
-
- enum EventType {
- INSERT,DELETE,UPDATE,REFRESH
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/core/events/EventListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/events/EventListener.java b/core/src/main/java/org/apache/carbondata/core/events/EventListener.java
deleted file mode 100644
index adf45b2..0000000
--- a/core/src/main/java/org/apache/carbondata/core/events/EventListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.events;
-
-/**
- * Event listener
- */
-public interface EventListener {
-
- void fireEvent(ChangeEvent event);
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index a1bbba5..e58c911 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -32,13 +32,13 @@ import org.apache.carbondata.core.datamap.dev.DataMapFactory;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.events.ChangeEvent;
import org.apache.carbondata.core.indexstore.Blocklet;
import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -164,6 +164,10 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
return distributables;
}
+ @Override public void fireEvent(Event event) {
+
+ }
+
@Override
public void clear(String segmentId) {
List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId);
@@ -213,11 +217,6 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe
}
@Override
- public void fireEvent(ChangeEvent event) {
-
- }
-
- @Override
public DataMapMeta getMeta() {
// TODO: pass SORT_COLUMNS into this class
return null;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/events/Event.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/Event.java b/core/src/main/java/org/apache/carbondata/events/Event.java
new file mode 100644
index 0000000..4de6632
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/events/Event.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events;
+
+/**
+ * Event class which will be used for retrieving the type of interface
+ */
+public abstract class Event {
+
+ /**
+ * Method for getting the event type. Used for invoking all listeners registered for an event
+ *
+ * @return
+ */
+ String getEventType() { return this.getClass().getName(); }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/events/OperationContext.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationContext.java b/core/src/main/java/org/apache/carbondata/events/OperationContext.java
new file mode 100644
index 0000000..f6fe676
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/events/OperationContext.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * One OperationContext per one operation.
+ * OperationContext active till operation execution completes
+ */
+public class OperationContext {
+
+ private Map<String, Object> operationProperties = new HashMap<String, Object>();
+
+ public Map<String, Object> getProperties() {
+ return operationProperties;
+ }
+
+ public void setProperty(String key, Object value) {
+ this.operationProperties.put(key, value);
+ }
+
+ public Object getProperty(String key) {
+ return this.operationProperties.get(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java b/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
new file mode 100644
index 0000000..7007f9b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/events/OperationEventListener.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events;
+
+/**
+ * Event listener interface which describes the possible events
+ */
+public interface OperationEventListener {
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ void onEvent(Event event, OperationContext operationContext) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
new file mode 100644
index 0000000..01ecb04
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * An event bus which posts events to its listeners.
+ */
+public class OperationListenerBus {
+
+ /**
+ * singleton instance
+ */
+ private static final OperationListenerBus INSTANCE = new OperationListenerBus();
+
+ /**
+ * Event map to hold all listeners corresponding to an event
+ */
+ protected Map<String, List<OperationEventListener>> eventMap =
+ new ConcurrentHashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ /**
+ * @return
+ */
+ public static OperationListenerBus getInstance() {
+ return INSTANCE;
+ }
+
+ /**
+ * Add a listener to listen events. This method is thread-safe and can be called in any thread.
+ *
+ * @param eventClass
+ * @param operationEventListener
+ */
+ public void addListener(Class<? extends Event> eventClass,
+ OperationEventListener operationEventListener) {
+
+ String eventType = eventClass.getName();
+ List<OperationEventListener> operationEventListeners = eventMap.get(eventType);
+ if (null == operationEventListeners) {
+ operationEventListeners = new CopyOnWriteArrayList<>();
+ eventMap.put(eventType, operationEventListeners);
+ }
+ operationEventListeners.add(operationEventListener);
+ }
+
+ /**
+ * Removes a given map from the event. This method is thread-safe and can be called
+ * in any thread.
+ *
+ * @param eventType
+ */
+ public void removeEvent(String eventType) {
+ eventMap.remove(eventType);
+ }
+
+ /**
+ * Removes a given listener for an event. This method is thread-safe and can be called
+ * in any thread.
+ *
+ * @param eventType
+ * @param operationEventListener
+ */
+ public void removeListener(String eventType, OperationEventListener operationEventListener) {
+ List<OperationEventListener> operationEventListeners = eventMap.get(eventType);
+ if (null != operationEventListeners) {
+ operationEventListeners.remove(operationEventListener);
+ }
+ }
+
+ /**
+ * Notify all registered listeners on occurrence of an event
+ * Should be used for stateless events which cannot be mapped to a operation
+ *
+ * @param event
+ */
+ public void fireEvent(Event event) throws Exception {
+ fireEvent(event, new OperationContext());
+ }
+
+ /**
+ * Notify all registered listeners on occurrence of an event
+ *
+ * @param event
+ * @param operationContext
+ */
+ public void fireEvent(Event event, OperationContext operationContext) throws Exception {
+ if (operationContext == null) {
+ throw new Exception("OperationContext cannot be null");
+ }
+ List<OperationEventListener> operationEventListeners = eventMap.get(event.getEventType());
+ if (null != operationEventListeners) {
+ for (OperationEventListener operationEventListener : operationEventListeners) {
+ operationEventListener.onEvent(event, operationContext);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 888c97d..348d9a7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -26,18 +26,18 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.events.ChangeEvent
import org.apache.carbondata.core.indexstore.schema.FilterType
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
class C2DataMapFactory() extends DataMapFactory {
override def init(identifier: AbsoluteTableIdentifier,
dataMapName: String): Unit = {}
- override def fireEvent(event: ChangeEvent[_]): Unit = ???
+ override def fireEvent(event: Event): Unit = ???
override def clear(segmentId: String): Unit = {}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
new file mode 100644
index 0000000..ec79acc
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, AlterTableRenameModel}
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableDropColumnModel
+ * @param sparkSession
+ */
+case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
+ alterTableDropColumnModel: AlterTableDropColumnModel,
+ sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableDropColumnModel
+ * @param sparkSession
+ */
+case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable,
+ alterTableDropColumnModel: AlterTableDropColumnModel,
+ sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableDropColumnModel
+ * @param sparkSession
+ */
+case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable,
+ alterTableDropColumnModel: AlterTableDropColumnModel,
+ sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableRenameModel
+ * @param newTablePath
+ * @param sparkSession
+ */
+case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
+ alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
+ sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableRenameModel
+ * @param newTablePath
+ * @param sparkSession
+ */
+case class AlterTableRenamePostEvent(carbonTable: CarbonTable,
+ alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
+ sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableRenameModel
+ * @param newTablePath
+ * @param sparkSession
+ */
+case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
+ alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
+ sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param carbonLoadModel
+ * @param mergedLoadName
+ * @param sQLContext
+ */
+case class AlterTableCompactionPreEvent(carbonTable: CarbonTable,
+ carbonLoadModel: CarbonLoadModel,
+ mergedLoadName: String,
+ sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param carbonLoadModel
+ * @param mergedLoadName
+ * @param sQLContext
+ */
+case class AlterTableCompactionPostEvent(carbonTable: CarbonTable,
+ carbonLoadModel: CarbonLoadModel,
+ mergedLoadName: String,
+ sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
+
+
+/**
+ * Class for handling clean up in case of any failure and abort the operation
+ *
+ * @param carbonTable
+ * @param carbonLoadModel
+ * @param mergedLoadName
+ * @param sQLContext
+ */
+case class AlterTableCompactionAbortEvent(carbonTable: CarbonTable,
+ carbonLoadModel: CarbonLoadModel,
+ mergedLoadName: String,
+ sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/CarbonInitEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CarbonInitEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CarbonInitEvents.scala
new file mode 100644
index 0000000..b76c534
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CarbonInitEvents.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ *
+ * @param sparkSession
+ * @param storePath
+ */
+case class CarbonEnvInitPreEvent(
+ sparkSession: SparkSession, storePath: String)
+ extends Event with SessionEventInfo
+
+
+/**
+ *
+ * @param sparkSession
+ * @param storePath
+ */
+case class CarbonEnvInitPostEvent(
+ sparkSession: SparkSession, storePath: String)
+ extends Event with SessionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
new file mode 100644
index 0000000..1a9c5f6
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CleanFilesEvents.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ *
+ * @param carbonTable
+ * @param sparkSession
+ */
+case class CleanFilesPreEvent(carbonTable: CarbonTable, sparkSession: SparkSession)
+ extends Event with CleanFilesEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param sparkSession
+ */
+case class CleanFilesPostEvent(carbonTable: CarbonTable, sparkSession: SparkSession)
+ extends Event with CleanFilesEventInfo
+
+/**
+ *
+ * @param carbonTable
+ * @param sparkSession
+ */
+case class CleanFilesAbortEvent(carbonTable: CarbonTable, sparkSession: SparkSession)
+ extends Event with CleanFilesEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/DeleteSegmentEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DeleteSegmentEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DeleteSegmentEvents.scala
new file mode 100644
index 0000000..0008492
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DeleteSegmentEvents.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ *
+ * @param carbonTable
+ * @param loadIds
+ * @param sparkSession
+ */
+case class DeleteSegmentByIdPreEvent(carbonTable: CarbonTable, loadIds: Seq[String],
+ sparkSession: SparkSession) extends Event with DeleteSegmentbyIdEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param loadIds
+ * @param sparkSession
+ */
+case class DeleteSegmentByIdPostEvent(carbonTable: CarbonTable, loadIds: Seq[String],
+ sparkSession: SparkSession) extends Event with DeleteSegmentbyIdEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param loadIds
+ * @param sparkSession
+ */
+case class DeleteSegmentByIdAbortEvent(carbonTable: CarbonTable, loadIds: Seq[String],
+ sparkSession: SparkSession) extends Event with DeleteSegmentbyIdEventInfo
+
+/**
+ *
+ * @param carbonTable
+ * @param loadDates
+ * @param sparkSession
+ */
+case class DeleteSegmentByDatePreEvent(carbonTable: CarbonTable, loadDates: String,
+ sparkSession: SparkSession) extends Event with DeleteSegmentbyDateEventInfo
+
+/**
+ *
+ * @param carbonTable
+ * @param loadDates
+ * @param sparkSession
+ */
+case class DeleteSegmentByDatePostEvent(carbonTable: CarbonTable, loadDates: String,
+ sparkSession: SparkSession) extends Event with DeleteSegmentbyDateEventInfo
+
+/**
+ *
+ * @param carbonTable
+ * @param loadDates
+ * @param sparkSession
+ */
+case class DeleteSegmentByDateAbortEvent(carbonTable: CarbonTable, loadDates: String,
+ sparkSession: SparkSession) extends Event with DeleteSegmentbyDateEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
new file mode 100644
index 0000000..ed43de6
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+
+/**
+ *
+ * @param carbonTable
+ * @param ifExistsSet
+ * @param sparkSession
+ */
+case class DropTablePreEvent(carbonTable: CarbonTable,
+ ifExistsSet: Boolean,
+ sparkSession: SparkSession)
+ extends Event with DropTableEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param ifExistsSet
+ * @param sparkSession
+ */
+case class DropTablePostEvent(carbonTable: CarbonTable,
+ ifExistsSet: Boolean,
+ sparkSession: SparkSession)
+ extends Event with DropTableEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param ifExistsSet
+ * @param sparkSession
+ */
+case class DropTableAbortEvent(carbonTable: CarbonTable,
+ ifExistsSet: Boolean,
+ sparkSession: SparkSession)
+ extends Event with DropTableEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
new file mode 100644
index 0000000..0d923ed
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, AlterTableRenameModel}
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+/**
+ * event for database operations
+ */
+trait DatabaseEventInfo {
+ val databaseName: String
+}
+
+/**
+ * event for table related operations
+ */
+trait TableEventInfo {
+ val carbonTableIdentifier: CarbonTableIdentifier
+}
+
+/**
+ * event for load operations
+ */
+trait LoadEventInfo {
+ val carbonLoadModel: CarbonLoadModel
+}
+
+/**
+ * event for lookup
+ */
+trait LookupRelationEventInfo {
+ val carbonTable: CarbonTable
+}
+
+
+/**
+ * event for drop table
+ */
+trait DropTableEventInfo {
+ val carbonTable: CarbonTable
+ val ifExistsSet: Boolean
+}
+
+/**
+ * event for alter_table_drop_column
+ */
+trait AlterTableDropColumnEventInfo {
+ val carbonTable: CarbonTable
+ val alterTableDropColumnModel: AlterTableDropColumnModel
+}
+
+/**
+ * event for alter_table_rename
+ */
+trait AlterTableRenameEventInfo {
+ val carbonTable: CarbonTable
+ val alterTableRenameModel: AlterTableRenameModel
+}
+
+/**
+ * event for alter_table_rename
+ */
+trait AlterTableCompactionEventInfo {
+ val carbonTable: CarbonTable
+ val carbonLoadModel: CarbonLoadModel
+ val mergedLoadName: String
+}
+
+/**
+ * event for DeleteSegmentById
+ */
+trait DeleteSegmentbyIdEventInfo {
+ val carbonTable: CarbonTable
+ val loadIds: Seq[String]
+}
+
+/**
+ * event for DeleteSegmentByDate
+ */
+trait DeleteSegmentbyDateEventInfo {
+ val carbonTable: CarbonTable
+ val loadDates: String
+}
+
+/**
+ * event for Clean Files
+ */
+trait CleanFilesEventInfo {
+ val carbonTable: CarbonTable
+}
+
+/**
+ * event for update table
+ */
+trait UpdateTableEventInfo {
+ val carbonTable: CarbonTable
+}
+
+/**
+ * event for delete from table
+ */
+trait DeleteFromTableEventInfo {
+ val carbonTable: CarbonTable
+}
+
+/**
+ * event to initiate CarbonEnv
+ */
+trait SessionEventInfo {
+ val sparkSession: SparkSession
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
new file mode 100644
index 0000000..deebb65
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.events
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ *
+ * @param carbonTable
+ */
+case class UpdateTablePreEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ */
+case class UpdateTablePostEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ */
+case class UpdateTableAbortEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+
+/**
+ *
+ * @param carbonTable
+ */
+case class DeleteFromTablePreEvent(carbonTable: CarbonTable)
+ extends Event with DeleteFromTableEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ */
+case class DeleteFromTablePostEvent(carbonTable: CarbonTable)
+ extends Event with DeleteFromTableEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ */
+case class DeleteFromTableAbortEvent(carbonTable: CarbonTable)
+ extends Event with DeleteFromTableEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
new file mode 100644
index 0000000..e3833d8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+/**
+ * Class for handling operations before start of a load process.
+ * Example usage: For validation purpose
+ */
+case class LoadTablePreExecutionEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier,
+ carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
+
+/**
+ * Class for handling operations after data load completion and before final
+ * commit of load operation. Example usage: For loading pre-aggregate tables
+ */
+case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier,
+ carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
+
+/**
+ * Class for handling clean up in case of any failure and abort the operation.
+ */
+case class LoadTableAbortExecutionEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier,
+ carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/events/LookupRelationEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LookupRelationEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LookupRelationEvents.scala
new file mode 100644
index 0000000..178ff58
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LookupRelationEvents.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+/**
+ *
+ * @param carbonTable
+ * @param sparkSession
+ */
+case class LookupRelationPreEvent(
+ carbonTable: CarbonTable,
+ sparkSession: SparkSession) extends Event with LookupRelationEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param sparkSession
+ */
+case class LookupRelationPostEvent(
+ carbonTable: CarbonTable,
+ sparkSession: SparkSession) extends Event with LookupRelationEventInfo
+
+
+/**
+ *
+ * @param carbonTable
+ * @param sparkSession
+ */
+case class LookupRelationAbortEvent(
+ carbonTable: CarbonTable,
+ sparkSession: SparkSession) extends Event with LookupRelationEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index cb25756..adc71cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -22,9 +22,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.events._
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
import org.apache.carbondata.spark.util.CommonUtil
@@ -68,6 +68,12 @@ object Compactor {
carbonLoadModel.setStorePath(carbonMergerMapping.hdfsStoreLocation)
carbonLoadModel.setLoadMetadataDetails(
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+ // trigger event for compaction
+ val operationContext = new OperationContext
+ val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+ AlterTableCompactionPreEvent(carbonTable, carbonLoadModel, mergedLoadName, sc)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+
var execInstance = "1"
// in case of non dynamic executor allocation, number of executors are fixed.
if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
@@ -110,6 +116,12 @@ object Compactor {
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
CommonUtil.mergeIndexFiles(sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable)
+
+ // trigger event for compaction
+ val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+ AlterTableCompactionPostEvent(carbonTable, carbonLoadModel, mergedLoadName, sc)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+
val endTime = System.nanoTime()
logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
val statusFileUpdation =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 47d7c95..1083669 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.csvinput.BlockDetails
@@ -267,6 +268,7 @@ object CarbonDataRDDFactory {
updateModel: Option[UpdateTableModel] = None
): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val operationContext = new OperationContext
// for handling of the segment Merging.
LOGGER.audit(s"Data load request has been received for table" +
@@ -475,6 +477,13 @@ object CarbonDataRDDFactory {
throw new Exception("No Data to load")
}
writeDictionary(carbonLoadModel, result, writeAll = false)
+ // Register a handler here for executing tasks required before committing
+ // the load operation to a table status file
+ val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+ LoadTablePostExecutionEvent(sqlContext.sparkSession,
+ carbonTable.getCarbonTableIdentifier,
+ carbonLoadModel)
+ OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 9d10ea0..b324b10 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.internal.CarbonSQLConf
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -70,6 +71,11 @@ class CarbonEnv {
properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
}
LOGGER.info(s"carbon env initial: $storePath")
+ // trigger event for CarbonEnv init
+ val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
+ CarbonEnvInitPreEvent(sparkSession, storePath)
+ OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent)
+
CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
}
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 5f70771..5905493 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -21,13 +21,15 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{DropTablePostEvent, DropTablePreEvent, OperationContext, OperationListenerBus}
case class CarbonDropTableCommand(
ifExistsSet: Boolean,
@@ -59,8 +61,26 @@ case class CarbonDropTableCommand(
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
+ // fires the event before dropping main table
+ val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val operationContext = new OperationContext
+ val dropTablePreEvent: DropTablePreEvent =
+ DropTablePreEvent(
+ carbonTable,
+ ifExistsSet,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
+
CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
+
+ val dropTablePostEvent: DropTablePostEvent =
+ DropTablePostEvent(
+ carbonTable,
+ ifExistsSet,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
+
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} catch {
case ex: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 9406335..1b16b88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
case class CleanFilesCommand(
databaseNameOp: Option[String],
@@ -46,12 +47,22 @@ case class CleanFilesCommand(
val relation = catalog
.lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
val carbonTable = relation.tableMeta.carbonTable
+ val cleanFilesPreEvent: CleanFilesPreEvent =
+ CleanFilesPreEvent(carbonTable,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
+
CarbonStore.cleanFiles(
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
carbonTable,
forceTableClean)
+
+ val cleanFilesPostEvent: CleanFilesPostEvent =
+ CleanFilesPostEvent(carbonTable,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
}
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
index 1ea4508..9ea4018 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByIdCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
case class DeleteLoadByIdCommand(
loadIds: Seq[String],
@@ -37,12 +38,26 @@ case class DeleteLoadByIdCommand(
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
+ val operationContext = new OperationContext
+
+ val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
+ DeleteSegmentByIdPreEvent(carbonTable,
+ loadIds,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(deleteSegmentByIdPreEvent, operationContext)
+
CarbonStore.deleteLoadById(
loadIds,
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
carbonTable
)
+
+ val deleteSegmentPostEvent: DeleteSegmentByIdPostEvent =
+ DeleteSegmentByIdPostEvent(carbonTable,
+ loadIds,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(deleteSegmentByIdPreEvent, operationContext)
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
index 3d06b18..58d8236 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/DeleteLoadByLoadDateCommand.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.command.{Checker, DataProcessCommand, Runn
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
case class DeleteLoadByLoadDateCommand(
databaseNameOp: Option[String],
@@ -39,12 +40,26 @@ case class DeleteLoadByLoadDateCommand(
val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.
lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation].
tableMeta.carbonTable
+ val operationContext = new OperationContext
+ val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
+ DeleteSegmentByDatePreEvent(carbonTable,
+ loadDate,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(deleteSegmentByDatePreEvent, operationContext)
+
CarbonStore.deleteLoadByDate(
loadDate,
GetDB.getDatabaseName(databaseNameOp, sparkSession),
tableName,
carbonTable
)
+
+ val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =
+ DeleteSegmentByDatePostEvent(carbonTable,
+ loadDate,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(deleteSegmentByDatePreEvent, operationContext)
+
Seq.empty
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index af971d0..764deb7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -25,8 +25,8 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.FailureCauses
-
/**
* IUD update delete and compaction framework.
*
@@ -52,6 +52,13 @@ private[sql] case class ProjectForDeleteCommand(
.lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
asInstanceOf[CarbonRelation]
val carbonTable = relation.tableMeta.carbonTable
+
+ // trigger event for Delete from table
+ val operationContext = new OperationContext
+ val deleteFromTablePreEvent: DeleteFromTablePreEvent =
+ DeleteFromTablePreEvent(carbonTable)
+ OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
+
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
LockUsage.METADATA_LOCK)
@@ -76,6 +83,11 @@ private[sql] case class ProjectForDeleteCommand(
// call IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
isUpdateOperation = false)
+
+ // trigger post event for Delete from table
+ val deleteFromTablePostEvent: DeleteFromTablePostEvent =
+ DeleteFromTablePostEvent(carbonTable)
+ OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
}
} catch {
case e: HorizontalCompactionException =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index faeb3af..e48693b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, Lock
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.processing.loading.FailureCauses
private[sql] case class ProjectForUpdateCommand(
@@ -58,6 +59,13 @@ private[sql] case class ProjectForUpdateCommand(
.lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
asInstanceOf[CarbonRelation]
val carbonTable = relation.tableMeta.carbonTable
+
+ // trigger event for Update table
+ val operationContext = new OperationContext
+ val updateTablePreEvent: UpdateTablePreEvent =
+ UpdateTablePreEvent(carbonTable)
+ OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
+
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
LockUsage.METADATA_LOCK)
@@ -111,6 +119,11 @@ private[sql] case class ProjectForUpdateCommand(
// Do IUD Compaction.
HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+
+ // trigger event for Update table
+ val updateTablePostEvent: UpdateTablePostEvent =
+ UpdateTablePostEvent(carbonTable)
+ OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
} catch {
case e: HorizontalCompactionException =>
LOGGER.error(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 3ac23f7..0b737bf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD
@@ -99,6 +100,15 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.")
}
+
+ val operationContext = new OperationContext
+ // event will be fired before dropping the columns
+ val alterTableDropColumnPreEvent: AlterTableDropColumnPreEvent = AlterTableDropColumnPreEvent(
+ carbonTable,
+ alterTableDropColumnModel,
+ sparkSession)
+ OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPreEvent, operationContext)
+
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
@@ -130,6 +140,15 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
dictionaryColumns,
carbonTable.getCarbonTableIdentifier,
carbonTable.getStorePath).collect()
+
+ // event will be fired before dropping the columns
+ val alterTableDropColumnPostEvent: AlterTableDropColumnPostEvent =
+ AlterTableDropColumnPostEvent(
+ carbonTable,
+ alterTableDropColumnModel,
+ sparkSession)
+ OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext)
+
LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 88cf212..c000488 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -110,6 +111,14 @@ private[sql] case class CarbonAlterTableRenameCommand(
tableInfo,
schemaEvolutionEntry,
tableMeta.tablePath)(sparkSession)
+ val operationContext = new OperationContext
+ val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
+ carbonTable,
+ alterTableRenameModel,
+ newTablePath,
+ sparkSession)
+ OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
+
metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
.runSqlHive(
@@ -119,6 +128,13 @@ private[sql] case class CarbonAlterTableRenameCommand(
s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
s"('tableName'='$newTableName', " +
s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+ val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent(
+ carbonTable,
+ alterTableRenameModel,
+ newTablePath,
+ sparkSession)
+ OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
+
sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
Some(oldDatabaseName)).quotedString)
LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4aa0f493/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 16724fc..9822d8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -42,6 +42,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
+import org.apache.carbondata.events.{LookupRelationPostEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.processing.merger.TableMeta
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -129,6 +130,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
carbonDatasourceHadoopRelation.carbonRelation
case _ => throw new NoSuchTableException(database, tableIdentifier.table)
}
+
+ // fire post event after lookup relation
+ val operationContext = new OperationContext
+ val lookupRelationPostEvent: LookupRelationPostEvent =
+ LookupRelationPostEvent(
+ relation.tableMeta.carbonTable,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(lookupRelationPostEvent, operationContext)
relation
}