You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/04/04 14:43:29 UTC

[1/2] hive git commit: HIVE-16164: Provide mechanism for passing HMS notification ID between transactional and non-transactional listeners. (Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)

Repository: hive
Updated Branches:
  refs/heads/master 2985262b8 -> aa29cd9d6


http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
new file mode 100644
index 0000000..20011cc
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+
+/**
+ * This class is used to notify a list of listeners about specific MetaStore events.
+ */
+@Private
+public class MetaStoreListenerNotifier {
+  private interface EventNotifier {
+    void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException;
+  }
+
+  private static Map<EventType, EventNotifier> notificationEvents = Maps.newHashMap(
+      ImmutableMap.<EventType, EventNotifier>builder()
+          .put(EventType.CREATE_DATABASE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onCreateDatabase((CreateDatabaseEvent)event);
+            }
+          })
+          .put(EventType.DROP_DATABASE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropDatabase((DropDatabaseEvent)event);
+            }
+          })
+          .put(EventType.CREATE_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onCreateTable((CreateTableEvent)event);
+            }
+          })
+          .put(EventType.DROP_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropTable((DropTableEvent)event);
+            }
+          })
+          .put(EventType.ADD_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAddPartition((AddPartitionEvent)event);
+            }
+          })
+          .put(EventType.DROP_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropPartition((DropPartitionEvent)event);
+            }
+          })
+          .put(EventType.ALTER_TABLE, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAlterTable((AlterTableEvent)event);
+            }
+          })
+          .put(EventType.ALTER_PARTITION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAlterPartition((AlterPartitionEvent)event);
+            }
+          })
+          .put(EventType.INSERT, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onInsert((InsertEvent)event);
+            }
+          })
+          .put(EventType.CREATE_FUNCTION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onCreateFunction((CreateFunctionEvent)event);
+            }
+          })
+          .put(EventType.DROP_FUNCTION, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropFunction((DropFunctionEvent)event);
+            }
+          })
+          .put(EventType.CREATE_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAddIndex((AddIndexEvent)event);
+            }
+          })
+          .put(EventType.DROP_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onDropIndex((DropIndexEvent)event);
+            }
+          })
+          .put(EventType.ALTER_INDEX, new EventNotifier() {
+            @Override
+            public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+              listener.onAlterIndex((AlterIndexEvent)event);
+            }
+          })
+          .build()
+  );
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+   *         map if no parameters were updated or if no listeners were notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+                                                EventType eventType,
+                                                ListenerEvent event) throws MetaException {
+
+    Preconditions.checkNotNull(listeners, "Listeners must not be null.");
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    for (MetaStoreEventListener listener : listeners) {
+      notificationEvents.get(eventType).notify(listener, event);
+    }
+
+    // Each listener called above might set a different parameter on the event.
+    // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
+    // method calls.
+    return event.getParameters();
+  }
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+   *         map if no parameters were updated or if no listeners were notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+                                                EventType eventType,
+                                                ListenerEvent event,
+                                                EnvironmentContext environmentContext) throws MetaException {
+
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    event.setEnvironmentContext(environmentContext);
+    return notifyEvent(listeners, eventType, event);
+  }
+
+  /**
+   * Notify a list of listeners about a specific metastore event. Each listener notified might update
+   * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+   * be returned to the caller.
+   *
+   * @param listeners List of MetaStoreEventListener listeners.
+   * @param eventType Type of the notification event.
+   * @param event The ListenerEvent with information about the event.
+   * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+   * @param parameters A list of key/value pairs with the new parameters to add.
+   * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+   *         map if no parameters were updated or if no listeners were notified.
+   * @throws MetaException If an error occurred while calling the listeners.
+   */
+  public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+                                                EventType eventType,
+                                                ListenerEvent event,
+                                                EnvironmentContext environmentContext,
+                                                Map<String, String> parameters) throws MetaException {
+
+    Preconditions.checkNotNull(event, "The event must not be null.");
+
+    event.putParameters(parameters);
+    return notifyEvent(listeners, eventType, event, environmentContext);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
index 62aeb8c..b741549 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
@@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Base class for all the events which are defined for metastore.
+ *
+ * This class is not thread-safe and not expected to be called in parallel.
  */
 
+@NotThreadSafe
 public abstract class ListenerEvent {
 
   /**
@@ -33,6 +41,26 @@ public abstract class ListenerEvent {
   private final boolean status;
   private final HMSHandler handler;
 
+  /**
+   * Key/value parameters used by listeners to store notifications results
+   * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID.
+   *
+   * NotThreadSafe: The parameters map is not expected to be access in parallel by Hive, so keep it thread-unsafe
+   * to avoid locking overhead.
+   */
+  private Map<String, String> parameters;
+
+  /** For performance concerns, it is preferable to cache the unmodifiable parameters variable that will be returned on the
+   * {@link #getParameters()} method. It is expected that {@link #putParameter(String, String)} is called less times
+   * than {@link #getParameters()}, so performance may be better by using this cache.
+   */
+  private Map<String, String> unmodifiableParameters;
+
+  // Listener parameters aren't expected to have many values. So far only
+  // DbNotificationListener will add a parameter; let's set a low initial capacity for now.
+  // If we find out many parameters are added, then we can adjust or remove this initial capacity.
+  private static final int PARAMETERS_INITIAL_CAPACITY = 1;
+
   // Properties passed by the client, to be used in execution hooks.
   private EnvironmentContext environmentContext = null;
 
@@ -40,6 +68,8 @@ public abstract class ListenerEvent {
     super();
     this.status = status;
     this.handler = handler;
+    this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY);
+    updateUnmodifiableParameters();
   }
 
   /**
@@ -49,6 +79,12 @@ public abstract class ListenerEvent {
     return status;
   }
 
+  /**
+   * Set the environment context of the event.
+   *
+   * @param environmentContext An EnvironmentContext object that contains environment parameters sent from
+   *                           the HMS client.
+   */
   public void setEnvironmentContext(EnvironmentContext environmentContext) {
     this.environmentContext = environmentContext;
   }
@@ -66,4 +102,74 @@ public abstract class ListenerEvent {
   public HMSHandler getHandler() {
     return handler;
   }
+
+  /**
+   * Return all parameters of the listener event. Parameters are read-only (unmodifiable map). If a new parameter
+   * must be added, please use the putParameter() method.
+   *
+   *
+   * @return A map object with all parameters.
+   */
+  public final Map<String, String> getParameters() {
+    return unmodifiableParameters;
+  }
+
+  /**
+   * Put a new parameter to the listener event.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param name Name of the parameter.
+   * @param value Value of the parameter.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  public void putParameter(String name, String value) {
+    putParameterIfAbsent(name, value);
+    updateUnmodifiableParameters();
+  }
+
+  /**
+   * Put a new set the parameters to the listener event.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param parameters A Map object with the a set of parameters.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  public void putParameters(final Map<String, String> parameters) {
+    if (parameters != null) {
+      for (Map.Entry<String, String> entry : parameters.entrySet()) {
+        putParameterIfAbsent(entry.getKey(), entry.getValue());
+      }
+
+      updateUnmodifiableParameters();
+    }
+  }
+
+  /**
+   * Put a parameter to the listener event only if the parameter is absent.
+   *
+   * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+   * between listeners setting the same parameters.
+   *
+   * @param name Name of the parameter.
+   * @param value Value of the parameter.
+   * @throws IllegalStateException if a parameter already exists.
+   */
+  private void putParameterIfAbsent(String name, String value) {
+    if (parameters.containsKey(name)) {
+      throw new IllegalStateException("Invalid attempt to overwrite a read-only parameter: " + name);
+    }
+
+    parameters.put(name, value);
+  }
+
+  /**
+   * Keeps a cache of unmodifiable parameters returned by the getParameters() method.
+   */
+  private void updateUnmodifiableParameters() {
+    unmodifiableParameters = Collections.unmodifiableMap(parameters);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 1f87eeb..9b8eaf2 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
 import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
 import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -37,6 +38,9 @@ import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InvalidInputException;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -116,6 +121,51 @@ public class TestObjectStore {
   }
 
   /**
+   * Test notification operations
+   */
+  @Test
+  public void testNotificationOps() throws InterruptedException {
+    final int NO_EVENT_ID = 0;
+    final int FIRST_EVENT_ID = 1;
+    final int SECOND_EVENT_ID = 2;
+
+    NotificationEvent event =
+        new NotificationEvent(0, 0, EventMessage.EventType.CREATE_DATABASE.toString(), "");
+    NotificationEventResponse eventResponse;
+    CurrentNotificationEventId eventId;
+
+    // Verify that there is no notifications available yet
+    eventId = objectStore.getCurrentNotificationEventId();
+    Assert.assertEquals(NO_EVENT_ID, eventId.getEventId());
+
+    // Verify that addNotificationEvent() updates the NotificationEvent with the new event ID
+    objectStore.addNotificationEvent(event);
+    Assert.assertEquals(FIRST_EVENT_ID, event.getEventId());
+    objectStore.addNotificationEvent(event);
+    Assert.assertEquals(SECOND_EVENT_ID, event.getEventId());
+
+    // Verify that objectStore fetches the latest notification event ID
+    eventId = objectStore.getCurrentNotificationEventId();
+    Assert.assertEquals(SECOND_EVENT_ID, eventId.getEventId());
+
+    // Verify that getNextNotification() returns all events
+    eventResponse = objectStore.getNextNotification(new NotificationEventRequest());
+    Assert.assertEquals(2, eventResponse.getEventsSize());
+    Assert.assertEquals(FIRST_EVENT_ID, eventResponse.getEvents().get(0).getEventId());
+    Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(1).getEventId());
+    // Verify that getNextNotification(last) returns events after a specified event
+    eventResponse = objectStore.getNextNotification(new NotificationEventRequest(FIRST_EVENT_ID));
+    Assert.assertEquals(1, eventResponse.getEventsSize());
+    Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(0).getEventId());
+
+    // Verify that cleanNotificationEvents() cleans up all old notifications
+    Thread.sleep(1);
+    objectStore.cleanNotificationEvents(1);
+    eventResponse = objectStore.getNextNotification(new NotificationEventRequest());
+    Assert.assertEquals(0, eventResponse.getEventsSize());
+  }
+
+  /**
    * Test database operations
    */
   @Test


[2/2] hive git commit: HIVE-16164: Provide mechanism for passing HMS notification ID between transactional and non-transactional listeners. (Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)

Posted by sp...@apache.org.
HIVE-16164: Provide mechanism for passing HMS notification ID between transactional and non-transactional listeners. (Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa29cd9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa29cd9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa29cd9d

Branch: refs/heads/master
Commit: aa29cd9d6044edb7d425b99ff347c92079475d1d
Parents: 2985262
Author: Sergio Pena <se...@cloudera.com>
Authored: Tue Apr 4 09:42:06 2017 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Tue Apr 4 09:43:05 2017 -0500

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  46 +-
 .../MetaStoreEventListenerConstants.java        |  33 ++
 .../listener/TestDbNotificationListener.java    | 190 +++++++
 .../hadoop/hive/metastore/HiveAlterHandler.java |  60 +--
 .../hadoop/hive/metastore/HiveMetaStore.java    | 529 +++++++++++--------
 .../metastore/MetaStoreListenerNotifier.java    | 224 ++++++++
 .../hive/metastore/events/ListenerEvent.java    | 106 ++++
 .../hadoop/hive/metastore/TestObjectStore.java  |  50 ++
 8 files changed, 959 insertions(+), 279 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index ea6cb79..bbfbc36 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.DropTableEvent;
 import org.apache.hadoop.hive.metastore.events.InsertEvent;
 import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -137,7 +138,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   /**
@@ -152,7 +153,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropTableMessage(t).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   /**
@@ -168,7 +169,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildAlterTableMessage(before, after).toString());
     event.setDbName(after.getDbName());
     event.setTableName(after.getTableName());
-    process(event);
+    process(event, tableEvent);
   }
 
   class FileIterator implements Iterator<String> {
@@ -276,7 +277,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -291,7 +292,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
     event.setDbName(t.getDbName());
     event.setTableName(t.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -307,7 +308,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
     event.setDbName(before.getDbName());
     event.setTableName(before.getTableName());
-    process(event);
+    process(event, partitionEvent);
   }
 
   /**
@@ -321,7 +322,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
             .buildCreateDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event);
+    process(event, dbEvent);
   }
 
   /**
@@ -335,7 +336,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
             .buildDropDatabaseMessage(db).toString());
     event.setDbName(db.getName());
-    process(event);
+    process(event, dbEvent);
   }
 
   /**
@@ -349,7 +350,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
             .buildCreateFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event);
+    process(event, fnEvent);
   }
 
   /**
@@ -363,7 +364,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
             .buildDropFunctionMessage(fn).toString());
     event.setDbName(fn.getDbName());
-    process(event);
+    process(event, fnEvent);
   }
 
   /**
@@ -377,7 +378,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
             .buildCreateIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   /**
@@ -391,7 +392,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
             .buildDropIndexMessage(index).toString());
     event.setDbName(index.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   /**
@@ -406,7 +407,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
         new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
             .buildAlterIndexMessage(before, after).toString());
     event.setDbName(before.getDbName());
-    process(event);
+    process(event, indexEvent);
   }
 
   class FileChksumIterator implements Iterator<String> {
@@ -443,7 +444,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
             .toString());
     event.setDbName(insertEvent.getDb());
     event.setTableName(insertEvent.getTable());
-    process(event);
+    process(event, insertEvent);
   }
 
   /**
@@ -467,14 +468,27 @@ public class DbNotificationListener extends MetaStoreEventListener {
     return (int)millis;
   }
 
-  // Process this notification by adding it to metastore DB
-  private void process(NotificationEvent event) throws MetaException {
+  /**
+   * Process this notification by adding it to metastore DB.
+   *
+   * @param event NotificationEvent is the object written to the metastore DB.
+   * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
+   *                      DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
+   */
+  private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
     event.setMessageFormat(msgFactory.getMessageFormat());
     synchronized (NOTIFICATION_TBL_LOCK) {
       LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
           event.getMessage());
       HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
     }
+
+      // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
+      if (event.isSetEventId()) {
+        listenerEvent.putParameter(
+            MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+            Long.toString(event.getEventId()));
+      }
   }
 
   private static class CleanerThread extends Thread {

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
new file mode 100644
index 0000000..a4f2d59
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+/**
+ * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent
+ * parameters.
+ */
+public class MetaStoreEventListenerConstants {
+  /*
+   * DbNotificationListener keys reserved for updating ListenerEvent parameters.
+   *
+   * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener
+   *                                   processed during an event. This event identifier might be shared
+   *                                   across other MetaStoreEventListener implementations.
+   */
+  public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 1cf47c3..50d8878 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -31,13 +31,16 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Stack;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.FireEventRequest;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.FunctionType;
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -56,6 +60,21 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
@@ -75,6 +94,8 @@ import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.data.Pair;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -97,12 +118,105 @@ public class TestDbNotificationListener {
   private int startTime;
   private long firstEventId;
 
+  /* This class is used to verify that HiveMetaStore calls the non-transactional listeners with the
+    * current event ID set by the DbNotificationListener class */
+  public static class MockMetaStoreEventListener extends MetaStoreEventListener {
+    private static Stack<Pair<EventType, String>> eventsIds = new Stack<>();
+
+    private static void pushEventId(EventType eventType, final ListenerEvent event) {
+      if (event.getStatus()) {
+        Map<String, String> parameters = event.getParameters();
+        if (parameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
+          Pair<EventType, String> pair =
+              new Pair<>(eventType, parameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
+          eventsIds.push(pair);
+        }
+      }
+    }
+
+    public static void popAndVerifyLastEventId(EventType eventType, long id) {
+      if (!eventsIds.isEmpty()) {
+        Pair<EventType, String> pair = eventsIds.pop();
+
+        assertEquals("Last event type does not match.", eventType, pair.first);
+        assertEquals("Last event ID does not match.", Long.toString(id), pair.second);
+      } else {
+        assertTrue("List of events is empty.",false);
+      }
+    }
+
+    public static void clearEvents() {
+      eventsIds.clear();
+    }
+
+    public MockMetaStoreEventListener(Configuration config) {
+      super(config);
+    }
+
+    public void onCreateTable (CreateTableEvent tableEvent) throws MetaException {
+      pushEventId(EventType.CREATE_TABLE, tableEvent);
+    }
+
+    public void onDropTable (DropTableEvent tableEvent)  throws MetaException {
+      pushEventId(EventType.DROP_TABLE, tableEvent);
+    }
+
+    public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
+      pushEventId(EventType.ALTER_TABLE, tableEvent);
+    }
+
+    public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
+      pushEventId(EventType.ADD_PARTITION, partitionEvent);
+    }
+
+    public void onDropPartition (DropPartitionEvent partitionEvent)  throws MetaException {
+      pushEventId(EventType.DROP_PARTITION, partitionEvent);
+    }
+
+    public void onAlterPartition (AlterPartitionEvent partitionEvent)  throws MetaException {
+      pushEventId(EventType.ALTER_PARTITION, partitionEvent);
+    }
+
+    public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException {
+      pushEventId(EventType.CREATE_DATABASE, dbEvent);
+    }
+
+    public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+      pushEventId(EventType.DROP_DATABASE, dbEvent);
+    }
+
+    public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
+      pushEventId(EventType.CREATE_INDEX, indexEvent);
+    }
+
+    public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
+      pushEventId(EventType.DROP_INDEX, indexEvent);
+    }
+
+    public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
+      pushEventId(EventType.ALTER_INDEX, indexEvent);
+    }
+
+    public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+      pushEventId(EventType.CREATE_FUNCTION, fnEvent);
+    }
+
+    public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+      pushEventId(EventType.DROP_FUNCTION, fnEvent);
+    }
+
+    public void onInsert(InsertEvent insertEvent) throws MetaException {
+      pushEventId(EventType.INSERT, insertEvent);
+    }
+  }
+
   @SuppressWarnings("rawtypes")
   @BeforeClass
   public static void connectToMetastore() throws Exception {
     HiveConf conf = new HiveConf();
     conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
         DbNotificationListener.class.getName());
+    conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, MockMetaStoreEventListener.class.getName());
     conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s");
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -139,6 +253,12 @@ public class TestDbNotificationListener {
     DummyRawStoreFailEvent.setEventSucceed(true);
   }
 
+  @After
+  public void tearDown() {
+    MockMetaStoreEventListener.clearEvents();
+  }
+
+
   @Test
   public void createDatabase() throws Exception {
     String dbName = "createdb";
@@ -164,6 +284,9 @@ public class TestDbNotificationListener {
     CreateDatabaseMessage createDbMsg = md.getCreateDatabaseMessage(event.getMessage());
     assertEquals(dbName, createDbMsg.getDB());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -206,6 +329,10 @@ public class TestDbNotificationListener {
     DropDatabaseMessage dropDbMsg = md.getDropDatabaseMessage(event.getMessage());
     assertEquals(dbName, dropDbMsg.getDB());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_DATABASE, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
@@ -256,6 +383,9 @@ public class TestDbNotificationListener {
     assertEquals(tblName, createTblMsg.getTable());
     assertEquals(table, createTblMsg.getTableObj());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     table =
@@ -312,6 +442,9 @@ public class TestDbNotificationListener {
     AlterTableMessage alterTableMessage = md.getAlterTableMessage(event.getMessage());
     assertEquals(table, alterTableMessage.getTableObjAfter());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -363,6 +496,10 @@ public class TestDbNotificationListener {
     assertEquals(defaultDbName, dropTblMsg.getDB());
     assertEquals(tblName, dropTblMsg.getTable());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_TABLE, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     table =
@@ -428,6 +565,10 @@ public class TestDbNotificationListener {
     assertTrue(ptnIter.hasNext());
     assertEquals(partition, ptnIter.next());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     partition =
@@ -494,6 +635,10 @@ public class TestDbNotificationListener {
     assertEquals(tblName, alterPtnMsg.getTable());
     assertEquals(newPart, alterPtnMsg.getPtnObjAfter());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -557,6 +702,11 @@ public class TestDbNotificationListener {
     assertEquals(table.getTableName(), tableObj.getTableName());
     assertEquals(table.getOwner(), tableObj.getOwner());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     List<String> newpartCol1Vals = Arrays.asList("tomorrow");
@@ -653,6 +803,13 @@ public class TestDbNotificationListener {
     Iterator<Map<String, String>> parts = dropPtnMsg.getPartitions().iterator();
     assertTrue(parts.hasNext());
     assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values()));
+
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, firstEventId + 5);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 4);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
   }
 
   @Test
@@ -693,6 +850,9 @@ public class TestDbNotificationListener {
     assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
     assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -742,6 +902,10 @@ public class TestDbNotificationListener {
     assertEquals(defaultDbName, dropFuncMsg.getDB());
     assertEquals(funcName, dropFuncMsg.getFunctionName());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_FUNCTION, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     func =
@@ -807,6 +971,11 @@ public class TestDbNotificationListener {
     assertEquals(tableName, indexObj.getOrigTableName());
     assertEquals(indexTableName, indexObj.getIndexTableName());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -873,6 +1042,12 @@ public class TestDbNotificationListener {
     assertEquals(indexTableName.toLowerCase(), dropIdxMsg.getIndexTableName());
     assertEquals(tableName.toLowerCase(), dropIdxMsg.getOrigTableName());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_INDEX, firstEventId + 4);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     index =
@@ -947,6 +1122,12 @@ public class TestDbNotificationListener {
     assertEquals(indexTableName, indexObj.getIndexTableName());
     assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime());
 
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ALTER_INDEX, firstEventId + 4);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
     // When hive.metastore.transactional.event.listeners is set,
     // a failed event should not create a new notification
     DummyRawStoreFailEvent.setEventSucceed(false);
@@ -1003,6 +1184,10 @@ public class TestDbNotificationListener {
     assertEquals(tblName, event.getTableName());
     // Parse the message field
     verifyInsert(event, defaultDbName, tblName);
+
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
   }
 
   @Test
@@ -1063,6 +1248,11 @@ public class TestDbNotificationListener {
     Map<String,String> partKeyValsFromNotif = insertMessage.getPartitionKeyValues();
 
     assertMapEquals(partKeyVals, partKeyValsFromNotif);
+
+    // Verify the eventID was passed to the non-transactional listener
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+    MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 4ce6a65..d0511ad 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -268,12 +269,11 @@ public class HiveAlterHandler implements AlterHandler {
       }
 
       alterTableUpdateTableColumnStats(msdb, oldt, newt);
-      if (transactionalListeners != null && transactionalListeners.size() > 0) {
-        AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newt, true, handler);
-        alterTableEvent.setEnvironmentContext(environmentContext);
-        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-          transactionalListener.onAlterTable(alterTableEvent);
-        }
+      if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                              EventMessage.EventType.ALTER_TABLE,
+                                              new AlterTableEvent(oldt, newt, true, handler),
+                                              environmentContext);
       }
       // commit the changes
       success = msdb.commitTransaction();
@@ -381,13 +381,13 @@ public class HiveAlterHandler implements AlterHandler {
 
         updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
-        if (transactionalListeners != null && transactionalListeners.size() > 0) {
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, new_part, tbl, true, handler);
-          alterPartitionEvent.setEnvironmentContext(environmentContext);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onAlterPartition(alterPartitionEvent);
-          }
+        if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                EventMessage.EventType.ALTER_PARTITION,
+                                                new AlterPartitionEvent(oldPart, new_part, tbl, true, handler),
+                                                environmentContext);
+
+
         }
         success = msdb.commitTransaction();
       } catch (InvalidObjectException e) {
@@ -499,13 +499,11 @@ public class HiveAlterHandler implements AlterHandler {
         }
       }
 
-      if (transactionalListeners != null && transactionalListeners.size() > 0) {
-        AlterPartitionEvent alterPartitionEvent =
-            new AlterPartitionEvent(oldPart, new_part, tbl, true, handler);
-        alterPartitionEvent.setEnvironmentContext(environmentContext);
-        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-          transactionalListener.onAlterPartition(alterPartitionEvent);
-        }
+      if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                              EventMessage.EventType.ALTER_PARTITION,
+                                              new AlterPartitionEvent(oldPart, new_part, tbl, true, handler),
+                                              environmentContext);
       }
 
       success = msdb.commitTransaction();
@@ -534,13 +532,11 @@ public class HiveAlterHandler implements AlterHandler {
           try {
             msdb.openTransaction();
             msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
-            if (transactionalListeners != null && transactionalListeners.size() > 0) {
-              AlterPartitionEvent alterPartitionEvent =
-                  new AlterPartitionEvent(new_part, oldPart, tbl, true, handler);
-              alterPartitionEvent.setEnvironmentContext(environmentContext);
-              for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-                transactionalListener.onAlterPartition(alterPartitionEvent);
-              }
+            if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventMessage.EventType.ALTER_PARTITION,
+                                                    new AlterPartitionEvent(new_part, oldPart, tbl, success, handler),
+                                                    environmentContext);
             }
 
             revertMetaDataTransaction = msdb.commitTransaction();
@@ -625,12 +621,10 @@ public class HiveAlterHandler implements AlterHandler {
               "when invoking MetaStoreEventListener for alterPartitions event.");
         }
 
-        if (transactionalListeners != null && transactionalListeners.size() > 0) {
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, newPart, tbl, true, handler);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onAlterPartition(alterPartitionEvent);
-          }
+        if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                EventMessage.EventType.ALTER_PARTITION,
+                                                new AlterPartitionEvent(oldPart, newPart, tbl, true, handler));
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 80b1e98..3aabe22 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -57,8 +57,11 @@ import java.util.regex.Pattern;
 
 import javax.jdo.JDOException;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
 import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -112,6 +115,7 @@ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
 import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
 import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -151,10 +155,6 @@ import com.facebook.fb303.fb_status;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -869,6 +869,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path dbPath = new Path(db.getLocationUri());
       boolean success = false;
       boolean madeDir = false;
+      Map<String, String> transactionalListenersResponses = Collections.emptyMap();
       try {
         firePreEvent(new PreCreateDatabaseEvent(db, this));
         if (!wh.isDir(dbPath)) {
@@ -881,11 +882,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.openTransaction();
         ms.createDatabase(db);
-        if (transactionalListeners.size() > 0) {
-          CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onCreateDatabase(cde);
-          }
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenersResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_DATABASE,
+                                                    new CreateDatabaseEvent(db, true, this));
         }
 
         success = ms.commitTransaction();
@@ -896,8 +898,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(dbPath, true);
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onCreateDatabase(new CreateDatabaseEvent(db, success, this));
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_DATABASE,
+                                                new CreateDatabaseEvent(db, success, this),
+                                                null,
+                                                transactionalListenersResponses);
         }
       }
     }
@@ -1012,6 +1019,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Database db = null;
       List<Path> tablePaths = new ArrayList<Path>();
       List<Path> partitionPaths = new ArrayList<Path>();
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         db = ms.getDatabase(name);
@@ -1094,12 +1102,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.dropDatabase(name)) {
-          if (transactionalListeners.size() > 0) {
-            DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this);
-            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-              transactionalListener.onDropDatabase(dde);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_DATABASE,
+                                                      new DropDatabaseEvent(db, true, this));
           }
+
           success = ms.commitTransaction();
         }
       } finally {
@@ -1121,8 +1130,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
           // it is not a terrible thing even if the data is not deleted
         }
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onDropDatabase(new DropDatabaseEvent(db, success, this));
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_DATABASE,
+                                                new DropDatabaseEvent(db, success, this),
+                                                null,
+                                                transactionalListenerResponses);
         }
       }
     }
@@ -1380,6 +1394,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
       }
 
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       Path tblPath = null;
       boolean success = false, madeDir = false;
       try {
@@ -1440,12 +1455,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
         }
 
-        if (transactionalListeners.size() > 0) {
-          CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this);
-          createTableEvent.setEnvironmentContext(envContext);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onCreateTable(createTableEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_TABLE,
+                                                    new CreateTableEvent(tbl, true, this),
+                                                    envContext);
         }
 
         success = ms.commitTransaction();
@@ -1456,11 +1471,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             wh.deleteDir(tblPath, true);
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          CreateTableEvent createTableEvent =
-              new CreateTableEvent(tbl, success, this);
-          createTableEvent.setEnvironmentContext(envContext);
-          listener.onCreateTable(createTableEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_TABLE,
+                                                new CreateTableEvent(tbl, success, this),
+                                                envContext,
+                                                transactionalListenerResponses);
         }
       }
     }
@@ -1625,6 +1642,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Path> partPaths = null;
       Table tbl = null;
       boolean ifPurge = false;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         // drop any partitions
@@ -1678,12 +1696,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
               "Unable to drop index table " + tableName + " for index " + indexName);
         } else {
-          if (transactionalListeners.size() > 0) {
-            DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this);
-            dropTableEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-              transactionalListener.onDropTable(dropTableEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_TABLE,
+                                                      new DropTableEvent(tbl, deleteData, true, this),
+                                                      envContext);
           }
           success = ms.commitTransaction();
         }
@@ -1698,10 +1716,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           deleteTableData(tblPath, ifPurge);
           // ok even if the data is not deleted
         }
-        for (MetaStoreEventListener listener : listeners) {
-          DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
-          dropTableEvent.setEnvironmentContext(envContext);
-          listener.onDropTable(dropTableEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_TABLE,
+                                                new DropTableEvent(tbl, deleteData, success, this),
+                                                envContext,
+                                                transactionalListenerResponses);
         }
       }
       return success;
@@ -2165,6 +2186,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false, madeDir = false;
       Path partLocation = null;
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         part.setDbName(dbName);
@@ -2221,12 +2243,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (ms.addPartition(part)) {
-          if (transactionalListeners.size() > 0) {
-            AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this);
-            addPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-              transactionalListener.onAddPartition(addPartitionEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.ADD_PARTITION,
+                                                      new AddPartitionEvent(tbl, part, true, this),
+                                                      envContext);
           }
 
           success = ms.commitTransaction();
@@ -2239,11 +2261,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        for (MetaStoreEventListener listener : listeners) {
-          AddPartitionEvent addPartitionEvent =
-              new AddPartitionEvent(tbl, part, success, this);
-          addPartitionEvent.setEnvironmentContext(envContext);
-          listener.onAddPartition(addPartitionEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, part, success, this),
+                                                envContext,
+                                                transactionalListenerResponses);
         }
       }
       return part;
@@ -2388,8 +2411,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final Map<PartValEqWrapper, Boolean> addedPartitions =
           Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>());
       final List<Partition> newParts = new ArrayList<Partition>();
-      final List<Partition> existingParts = new ArrayList<Partition>();;
+      final List<Partition> existingParts = new ArrayList<Partition>();
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+
       try {
         ms.openTransaction();
         tbl = ms.getTable(dbName, tblName);
@@ -2475,7 +2500,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = false;
         // Notification is generated for newly created partitions only. The subset of partitions
         // that already exist (existingParts), will not generate notifications.
-        fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true);
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, newParts, true, this));
+        }
+
         success = ms.commitTransaction();
       } finally {
         if (!success) {
@@ -2486,12 +2517,26 @@ public class HiveMetaStore extends ThriftHiveMetastore {
               wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
             }
           }
-          fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ADD_PARTITION,
+                                                  new AddPartitionEvent(tbl, parts, false, this));
+          }
         } else {
-          fireMetaStoreAddPartitionEvent(tbl, newParts, null, true);
-          if (existingParts != null) {
-            // The request has succeeded but we failed to add these partitions.
-            fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ADD_PARTITION,
+                                                  new AddPartitionEvent(tbl, newParts, true, this),
+                                                  null,
+                                                  transactionalListenerResponses);
+
+            if (!existingParts.isEmpty()) {
+              // The request has succeeded but we failed to add these partitions.
+              MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, existingParts, false, this));
+            }
           }
         }
       }
@@ -2578,6 +2623,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy
           .getPartitionIterator();
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         tbl = ms.getTable(dbName, tblName);
@@ -2651,7 +2697,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists);
         //setting success to false to make sure that if the listener fails, rollback happens.
         success = false;
-        fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, partitionSpecProxy, true, this));
+        }
+
         success = ms.commitTransaction();
         return addedPartitions.size();
       } finally {
@@ -2664,7 +2717,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             }
           }
         }
-        fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, partitionSpecProxy, true, this),
+                                                null,
+                                                transactionalListenerResponses);
+        }
       }
     }
 
@@ -2769,6 +2829,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
       boolean success = false;
       Table tbl = null;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         tbl = ms.getTable(part.getDbName(), part.getTableName());
@@ -2793,7 +2854,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Setting success to false to make sure that if the listener fails, rollback happens.
         success = false;
-        fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(tbl, Arrays.asList(part), true, this),
+                                                    envContext);
+
+        }
+
         // we proceed only if we'd actually succeeded anyway, otherwise,
         // we'd have thrown an exception
         success = ms.commitTransaction();
@@ -2801,64 +2871,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!success) {
           ms.rollbackTransaction();
         }
-        fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
-      }
-      return part;
-    }
-
-    private void fireMetaStoreAddPartitionEvent(final Table tbl,
-        final List<Partition> parts, final EnvironmentContext envContext, boolean success)
-          throws MetaException {
-      if (tbl != null && parts != null && !parts.isEmpty()) {
-        AddPartitionEvent addPartitionEvent =
-            new AddPartitionEvent(tbl, parts, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
 
-    private void fireMetaStoreAddPartitionEvent(final Table tbl,
-        final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success)
-          throws MetaException {
-      if (tbl != null && partitionSpec != null) {
-        AddPartitionEvent addPartitionEvent =
-            new AddPartitionEvent(tbl, partitionSpec, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
-
-    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
-          final List<Partition> parts, final EnvironmentContext envContext, boolean success)
-            throws MetaException {
-      if (tbl != null && parts != null && !parts.isEmpty()) {
-        AddPartitionEvent addPartitionEvent =
-                new AddPartitionEvent(tbl, parts, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-          transactionalListener.onAddPartition(addPartitionEvent);
-        }
-      }
-    }
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                new AddPartitionEvent(tbl, Arrays.asList(part), success, this),
+                                                envContext,
+                                                transactionalListenerResponses);
 
-    private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
-          final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success)
-            throws MetaException {
-      if (tbl != null && partitionSpec != null) {
-        AddPartitionEvent addPartitionEvent =
-                new AddPartitionEvent(tbl, partitionSpec, success, this);
-        addPartitionEvent.setEnvironmentContext(envContext);
-        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-          transactionalListener.onAddPartition(addPartitionEvent);
         }
       }
+      return part;
     }
 
-
     @Override
     public Partition add_partition(final Partition part)
         throws InvalidObjectException, AlreadyExistsException, MetaException {
@@ -2941,6 +2966,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path destPath = new Path(destinationTable.getSd().getLocation(),
           Warehouse.makePartName(partitionKeysPresent, partValsPresent));
       List<Partition> destPartitions = new ArrayList<Partition>();
+
+      Map<String, String> transactionalListenerResponsesForAddPartition = Collections.emptyMap();
+      List<Map<String, String>> transactionalListenerResponsesForDropPartition =
+          Lists.newArrayListWithCapacity(partitionsToExchange.size());
+
       try {
         for (Partition partition: partitionsToExchange) {
           Partition destPartition = new Partition(partition);
@@ -2968,8 +2998,22 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Setting success to false to make sure that if the listener fails, rollback happens.
         success = false;
-        fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
-            destinationTable, destPartitions, transactionalListeners, true);
+
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponsesForAddPartition =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ADD_PARTITION,
+                                                    new AddPartitionEvent(destinationTable, destPartitions, true, this));
+
+          for (Partition partition : partitionsToExchange) {
+            DropPartitionEvent dropPartitionEvent =
+                new DropPartitionEvent(sourceTable, partition, true, true, this);
+            transactionalListenerResponsesForDropPartition.add(
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      dropPartitionEvent));
+          }
+        }
 
         success = ms.commitTransaction();
         return destPartitions;
@@ -2979,34 +3023,31 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           if (pathCreated) {
             wh.renameDir(destPath, sourcePath);
           }
-
-          fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
-              destinationTable, destPartitions, listeners, success);
         }
-      }
-    }
 
-    private void fireMetaStoreExchangePartitionEvent(Table sourceTable,
-        List<Partition> partitionsToExchange, Table destinationTable,
-        List<Partition> destPartitions,
-        List<MetaStoreEventListener> eventListeners,
-        boolean status) throws MetaException {
-      if (sourceTable != null && destinationTable != null
-          && !CollectionUtils.isEmpty(partitionsToExchange)
-          && !CollectionUtils.isEmpty(destPartitions)) {
-        if (eventListeners.size() > 0) {
-          AddPartitionEvent addPartitionEvent =
-              new AddPartitionEvent(destinationTable, destPartitions, status, this);
-          for (MetaStoreEventListener eventListener : eventListeners) {
-            eventListener.onAddPartition(addPartitionEvent);
-          }
+        if (!listeners.isEmpty()) {
+          AddPartitionEvent addPartitionEvent = new AddPartitionEvent(destinationTable, destPartitions, success, this);
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ADD_PARTITION,
+                                                addPartitionEvent,
+                                                null,
+                                                transactionalListenerResponsesForAddPartition);
 
+          i = 0;
           for (Partition partition : partitionsToExchange) {
             DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(sourceTable, partition, true, status, this);
-            for (MetaStoreEventListener eventListener : eventListeners) {
-              eventListener.onDropPartition(dropPartitionEvent);
-            }
+                new DropPartitionEvent(sourceTable, partition, success, true, this);
+            Map<String, String> parameters =
+                (transactionalListenerResponsesForDropPartition.size() > i)
+                    ? transactionalListenerResponsesForDropPartition.get(i)
+                    : null;
+
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.DROP_PARTITION,
+                                                  dropPartitionEvent,
+                                                  null,
+                                                  parameters);
+            i++;
           }
         }
       }
@@ -3024,6 +3065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Path archiveParentDir = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
 
       try {
         ms.openTransaction();
@@ -3056,13 +3098,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
           throw new MetaException("Unable to drop partition");
         } else {
-          if (transactionalListeners.size() > 0) {
-            DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, true, deleteData, this);
-            dropPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-              transactionalListener.onDropPartition(dropPartitionEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      new DropPartitionEvent(tbl, part, true, deleteData, this),
+                                                      envContext);
           }
           success = ms.commitTransaction();
         }
@@ -3090,11 +3132,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
             // ok even if the data is not deleted
           }
         }
-        for (MetaStoreEventListener listener : listeners) {
-          DropPartitionEvent dropPartitionEvent =
-            new DropPartitionEvent(tbl, part, success, deleteData, this);
-          dropPartitionEvent.setEnvironmentContext(envContext);
-          listener.onDropPartition(dropPartitionEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_PARTITION,
+                                                new DropPartitionEvent(tbl, part, success, deleteData, this),
+                                                envContext,
+                                                transactionalListenerResponses);
         }
       }
       return true;
@@ -3156,6 +3199,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       List<Partition> parts = null;
       boolean mustPurge = false;
       boolean isExternalTbl = false;
+      List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList();
+
       try {
         // We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
         // Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes.
@@ -3239,14 +3284,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         ms.dropPartitions(dbName, tblName, partNames);
-        if (parts != null && transactionalListeners.size() > 0) {
+        if (parts != null && !transactionalListeners.isEmpty()) {
           for (Partition part : parts) {
-            DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, true, deleteData, this);
-            dropPartitionEvent.setEnvironmentContext(envContext);
-            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-              transactionalListener.onDropPartition(dropPartitionEvent);
-            }
+            transactionalListenerResponses.add(
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.DROP_PARTITION,
+                                                      new DropPartitionEvent(tbl, part, true, deleteData, this),
+                                                      envContext));
           }
         }
 
@@ -3280,12 +3324,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
         if (parts != null) {
-          for (Partition part : parts) {
-            for (MetaStoreEventListener listener : listeners) {
-              DropPartitionEvent dropPartitionEvent =
-                new DropPartitionEvent(tbl, part, success, deleteData, this);
-              dropPartitionEvent.setEnvironmentContext(envContext);
-              listener.onDropPartition(dropPartitionEvent);
+          int i = 0;
+          if (parts != null && !listeners.isEmpty()) {
+            for (Partition part : parts) {
+              Map<String, String> parameters =
+                  (!transactionalListenerResponses.isEmpty()) ? transactionalListenerResponses.get(i) : null;
+
+              MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                    EventType.DROP_PARTITION,
+                                                    new DropPartitionEvent(tbl, part, success, deleteData, this),
+                                                    envContext,
+                                                    parameters);
+
+              i++;
             }
           }
         }
@@ -3720,14 +3771,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         // Only fetch the table if we actually have a listener
         Table table = null;
-        for (MetaStoreEventListener listener : listeners) {
+        if (!listeners.isEmpty()) {
           if (table == null) {
             table = getMS().getTable(db_name, tbl_name);
           }
-          AlterPartitionEvent alterPartitionEvent =
-              new AlterPartitionEvent(oldPart, new_part, table, true, this);
-          alterPartitionEvent.setEnvironmentContext(envContext);
-          listener.onAlterPartition(alterPartitionEvent);
+
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_PARTITION,
+                                                new AlterPartitionEvent(oldPart, new_part, table, true, this),
+                                                envContext);
         }
       } catch (InvalidObjectException e) {
         ex = e;
@@ -3791,13 +3843,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           else {
             throw new InvalidOperationException("failed to alterpartitions");
           }
-          for (MetaStoreEventListener listener : listeners) {
-            if (table == null) {
-              table = getMS().getTable(db_name, tbl_name);
-            }
-            AlterPartitionEvent alterPartitionEvent =
-                new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this);
-            listener.onAlterPartition(alterPartitionEvent);
+
+          if (table == null) {
+            table = getMS().getTable(db_name, tbl_name);
+          }
+
+          if (!listeners.isEmpty()) {
+            MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                  EventType.ALTER_PARTITION,
+                                                  new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this));
           }
         }
       } catch (InvalidObjectException e) {
@@ -3834,16 +3888,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Exception ex = null;
       Index oldIndex = null;
       RawStore ms  = getMS();
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         oldIndex = get_index_by_name(dbname, base_table_name, index_name);
         firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
         ms.alterIndex(dbname, base_table_name, index_name, newIndex);
-        if (transactionalListeners.size() > 0) {
-          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onAlterIndex(alterIndexEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.ALTER_INDEX,
+                                                    new AlterIndexEvent(oldIndex, newIndex, true, this));
         }
 
         success = ms.commitTransaction();
@@ -3865,9 +3920,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         endFunction("alter_index", success, ex, base_table_name);
-        for (MetaStoreEventListener listener : listeners) {
-          AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success, this);
-          listener.onAlterIndex(alterIndexEvent);
+
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_INDEX,
+                                                new AlterIndexEvent(oldIndex, newIndex, success, this),
+                                                null,
+                                                transactionalListenerResponses);
         }
       }
     }
@@ -3935,11 +3994,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         alterHandler.alterTable(getMS(), wh, dbname, name, newTable,
                 envContext, this);
         success = true;
-        for (MetaStoreEventListener listener : listeners) {
-          AlterTableEvent alterTableEvent =
-              new AlterTableEvent(oldt, newTable, success, this);
-          alterTableEvent.setEnvironmentContext(envContext);
-          listener.onAlterTable(alterTableEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.ALTER_TABLE,
+                                                new AlterTableEvent(oldt, newTable, true, this),
+                                                envContext);
         }
       } catch (NoSuchObjectException e) {
         // thrown when the table to be altered does not exist
@@ -4506,6 +4565,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false, indexTableCreated = false;
       String[] qualified =
           MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName());
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         firePreEvent(new PreAddIndexEvent(index, this));
@@ -4543,11 +4603,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         index.setCreateTime((int) time);
         index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
         if (ms.addIndex(index)) {
-          if (transactionalListeners.size() > 0) {
-            AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
-            for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-              transactionalListener.onAddIndex(addIndexEvent);
-            }
+          if (!transactionalListeners.isEmpty()) {
+            transactionalListenerResponses =
+                MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                      EventType.CREATE_INDEX,
+                                                      new AddIndexEvent(index, true, this));
           }
         }
 
@@ -4564,9 +4624,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         }
 
-        for (MetaStoreEventListener listener : listeners) {
-          AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this);
-          listener.onAddIndex(addIndexEvent);
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_INDEX,
+                                                new AddIndexEvent(index, success, this),
+                                                null,
+                                                transactionalListenerResponses);
         }
       }
     }
@@ -4604,6 +4667,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Index index = null;
       Path tblPath = null;
       List<Path> partPaths = null;
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         // drop the underlying index table
@@ -4636,11 +4700,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        if (transactionalListeners.size() > 0) {
-          DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onDropIndex(dropIndexEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.DROP_INDEX,
+                                                    new DropIndexEvent(index, true, this));
         }
 
         success = ms.commitTransaction();
@@ -4653,11 +4717,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           // ok even if the data is not deleted
         }
         // Skip the event listeners if the index is NULL
-        if (index != null) {
-          for (MetaStoreEventListener listener : listeners) {
-            DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, this);
-            listener.onDropIndex(dropIndexEvent);
-          }
+        if (index != null && !listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_INDEX,
+                                                new DropIndexEvent(index, success, this),
+                                                null,
+                                                transactionalListenerResponses);
         }
       }
       return success;
@@ -6093,6 +6158,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       validateFunctionInfo(func);
       boolean success = false;
       RawStore ms = getMS();
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         Database db = ms.getDatabase(func.getDbName());
@@ -6109,11 +6175,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         long time = System.currentTimeMillis() / 1000;
         func.setCreateTime((int) time);
         ms.createFunction(func);
-        if (transactionalListeners.size() > 0) {
-          CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onCreateFunction(createFunctionEvent);
-          }
+        if (!transactionalListeners.isEmpty()) {
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.CREATE_FUNCTION,
+                                                    new CreateFunctionEvent(func, true, this));
         }
 
         success = ms.commitTransaction();
@@ -6122,11 +6188,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           ms.rollbackTransaction();
         }
 
-        if (listeners.size() > 0) {
-          CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this);
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onCreateFunction(createFunctionEvent);
-          }
+        if (!listeners.isEmpty()) {
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.CREATE_FUNCTION,
+                                                new CreateFunctionEvent(func, success, this),
+                                                null,
+                                                transactionalListenerResponses);
         }
       }
     }
@@ -6138,6 +6205,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       boolean success = false;
       Function func = null;
       RawStore ms = getMS();
+      Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       try {
         ms.openTransaction();
         func = ms.getFunction(dbName, funcName);
@@ -6147,10 +6215,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
 
         ms.dropFunction(dbName, funcName);
         if (transactionalListeners.size() > 0) {
-          DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this);
-          for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-            transactionalListener.onDropFunction(dropFunctionEvent);
-          }
+          transactionalListenerResponses =
+              MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+                                                    EventType.DROP_FUNCTION,
+                                                    new DropFunctionEvent(func, true, this));
         }
 
         success = ms.commitTransaction();
@@ -6160,10 +6228,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
 
         if (listeners.size() > 0) {
-          DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this);
-          for (MetaStoreEventListener listener : listeners) {
-            listener.onDropFunction(dropFunctionEvent);
-          }
+          MetaStoreListenerNotifier.notifyEvent(listeners,
+                                                EventType.DROP_FUNCTION,
+                                                new DropFunctionEvent(func, success, this),
+                                                null,
+                                                transactionalListenerResponses);
         }
       }
     }
@@ -6530,13 +6599,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         InsertEvent event =
             new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst
                 .getData().getInsertData(), rqst.isSuccessful(), this);
-        for (MetaStoreEventListener transactionalListener : transactionalListeners) {
-          transactionalListener.onInsert(event);
-        }
 
-        for (MetaStoreEventListener listener : listeners) {
-          listener.onInsert(event);
-        }
+        /*
+         * The transactional listener response will be set already on the event, so there is not need
+         * to pass the response to the non-transactional listener.
+         */
+        MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.INSERT, event);
+        MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, event);
 
         return new FireEventResponse();