You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/04/04 14:43:29 UTC
[1/2] hive git commit: HIVE-16164: Provide mechanism for passing HMS
notification ID between transactional and non-transactional listeners.
(Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)
Repository: hive
Updated Branches:
refs/heads/master 2985262b8 -> aa29cd9d6
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
new file mode 100644
index 0000000..20011cc
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.common.classification.InterfaceAudience.Private;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+
+/**
+ * This class is used to notify a list of listeners about specific MetaStore events.
+ */
+@Private
+public class MetaStoreListenerNotifier {
+ private interface EventNotifier {
+ void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException;
+ }
+
+ private static Map<EventType, EventNotifier> notificationEvents = Maps.newHashMap(
+ ImmutableMap.<EventType, EventNotifier>builder()
+ .put(EventType.CREATE_DATABASE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onCreateDatabase((CreateDatabaseEvent)event);
+ }
+ })
+ .put(EventType.DROP_DATABASE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropDatabase((DropDatabaseEvent)event);
+ }
+ })
+ .put(EventType.CREATE_TABLE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onCreateTable((CreateTableEvent)event);
+ }
+ })
+ .put(EventType.DROP_TABLE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropTable((DropTableEvent)event);
+ }
+ })
+ .put(EventType.ADD_PARTITION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAddPartition((AddPartitionEvent)event);
+ }
+ })
+ .put(EventType.DROP_PARTITION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropPartition((DropPartitionEvent)event);
+ }
+ })
+ .put(EventType.ALTER_TABLE, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAlterTable((AlterTableEvent)event);
+ }
+ })
+ .put(EventType.ALTER_PARTITION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAlterPartition((AlterPartitionEvent)event);
+ }
+ })
+ .put(EventType.INSERT, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onInsert((InsertEvent)event);
+ }
+ })
+ .put(EventType.CREATE_FUNCTION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onCreateFunction((CreateFunctionEvent)event);
+ }
+ })
+ .put(EventType.DROP_FUNCTION, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropFunction((DropFunctionEvent)event);
+ }
+ })
+ .put(EventType.CREATE_INDEX, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAddIndex((AddIndexEvent)event);
+ }
+ })
+ .put(EventType.DROP_INDEX, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onDropIndex((DropIndexEvent)event);
+ }
+ })
+ .put(EventType.ALTER_INDEX, new EventNotifier() {
+ @Override
+ public void notify(MetaStoreEventListener listener, ListenerEvent event) throws MetaException {
+ listener.onAlterIndex((AlterIndexEvent)event);
+ }
+ })
+ .build()
+ );
+
+ /**
+ * Notify a list of listeners about a specific metastore event. Each listener notified might update
+ * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+ * be returned to the caller.
+ *
+ * @param listeners List of MetaStoreEventListener listeners.
+ * @param eventType Type of the notification event.
+ * @param event The ListenerEvent with information about the event.
+ * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+ * map if no parameters were updated or if no listeners were notified.
+ * @throws MetaException If an error occurred while calling the listeners.
+ */
+ public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+ EventType eventType,
+ ListenerEvent event) throws MetaException {
+
+ Preconditions.checkNotNull(listeners, "Listeners must not be null.");
+ Preconditions.checkNotNull(event, "The event must not be null.");
+
+ for (MetaStoreEventListener listener : listeners) {
+ notificationEvents.get(eventType).notify(listener, event);
+ }
+
+ // Each listener called above might set a different parameter on the event.
+ // This write permission is allowed on the listener side to avoid breaking compatibility if we change the API
+ // method calls.
+ return event.getParameters();
+ }
+
+ /**
+ * Notify a list of listeners about a specific metastore event. Each listener notified might update
+ * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+ * be returned to the caller.
+ *
+ * @param listeners List of MetaStoreEventListener listeners.
+ * @param eventType Type of the notification event.
+ * @param event The ListenerEvent with information about the event.
+ * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+ * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+ * map if no parameters were updated or if no listeners were notified.
+ * @throws MetaException If an error occurred while calling the listeners.
+ */
+ public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+ EventType eventType,
+ ListenerEvent event,
+ EnvironmentContext environmentContext) throws MetaException {
+
+ Preconditions.checkNotNull(event, "The event must not be null.");
+
+ event.setEnvironmentContext(environmentContext);
+ return notifyEvent(listeners, eventType, event);
+ }
+
+ /**
+ * Notify a list of listeners about a specific metastore event. Each listener notified might update
+ * the (ListenerEvent) event by setting a parameter key/value pair. These updated parameters will
+ * be returned to the caller.
+ *
+ * @param listeners List of MetaStoreEventListener listeners.
+ * @param eventType Type of the notification event.
+ * @param event The ListenerEvent with information about the event.
+ * @param environmentContext An EnvironmentContext object with parameters sent by the HMS client.
+ * @param parameters A list of key/value pairs with the new parameters to add.
+ * @return A list of key/value pair parameters that the listeners set. The returned object will return an empty
+ * map if no parameters were updated or if no listeners were notified.
+ * @throws MetaException If an error occurred while calling the listeners.
+ */
+ public static Map<String, String> notifyEvent(List<MetaStoreEventListener> listeners,
+ EventType eventType,
+ ListenerEvent event,
+ EnvironmentContext environmentContext,
+ Map<String, String> parameters) throws MetaException {
+
+ Preconditions.checkNotNull(event, "The event must not be null.");
+
+ event.putParameters(parameters);
+ return notifyEvent(listeners, eventType, event, environmentContext);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
index 62aeb8c..b741549 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/ListenerEvent.java
@@ -21,10 +21,18 @@ package org.apache.hadoop.hive.metastore.events;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Base class for all the events which are defined for metastore.
+ *
+ * This class is not thread-safe and not expected to be called in parallel.
*/
+@NotThreadSafe
public abstract class ListenerEvent {
/**
@@ -33,6 +41,26 @@ public abstract class ListenerEvent {
private final boolean status;
private final HMSHandler handler;
+ /**
+ * Key/value parameters used by listeners to store notifications results
+ * i.e. DbNotificationListener sets a DB_NOTIFICATION_EVENT_ID.
+ *
+ * NotThreadSafe: The parameters map is not expected to be access in parallel by Hive, so keep it thread-unsafe
+ * to avoid locking overhead.
+ */
+ private Map<String, String> parameters;
+
+ /** For performance concerns, it is preferable to cache the unmodifiable parameters variable that will be returned on the
+ * {@link #getParameters()} method. It is expected that {@link #putParameter(String, String)} is called less times
+ * than {@link #getParameters()}, so performance may be better by using this cache.
+ */
+ private Map<String, String> unmodifiableParameters;
+
+ // Listener parameters aren't expected to have many values. So far only
+ // DbNotificationListener will add a parameter; let's set a low initial capacity for now.
+ // If we find out many parameters are added, then we can adjust or remove this initial capacity.
+ private static final int PARAMETERS_INITIAL_CAPACITY = 1;
+
// Properties passed by the client, to be used in execution hooks.
private EnvironmentContext environmentContext = null;
@@ -40,6 +68,8 @@ public abstract class ListenerEvent {
super();
this.status = status;
this.handler = handler;
+ this.parameters = new HashMap<>(PARAMETERS_INITIAL_CAPACITY);
+ updateUnmodifiableParameters();
}
/**
@@ -49,6 +79,12 @@ public abstract class ListenerEvent {
return status;
}
+ /**
+ * Set the environment context of the event.
+ *
+ * @param environmentContext An EnvironmentContext object that contains environment parameters sent from
+ * the HMS client.
+ */
public void setEnvironmentContext(EnvironmentContext environmentContext) {
this.environmentContext = environmentContext;
}
@@ -66,4 +102,74 @@ public abstract class ListenerEvent {
public HMSHandler getHandler() {
return handler;
}
+
+ /**
+ * Return all parameters of the listener event. Parameters are read-only (unmodifiable map). If a new parameter
+ * must be added, please use the putParameter() method.
+ *
+ *
+ * @return A map object with all parameters.
+ */
+ public final Map<String, String> getParameters() {
+ return unmodifiableParameters;
+ }
+
+ /**
+ * Put a new parameter to the listener event.
+ *
+ * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+ * between listeners setting the same parameters.
+ *
+ * @param name Name of the parameter.
+ * @param value Value of the parameter.
+ * @throws IllegalStateException if a parameter already exists.
+ */
+ public void putParameter(String name, String value) {
+ putParameterIfAbsent(name, value);
+ updateUnmodifiableParameters();
+ }
+
+ /**
+ * Put a new set the parameters to the listener event.
+ *
+ * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+ * between listeners setting the same parameters.
+ *
+ * @param parameters A Map object with the a set of parameters.
+ * @throws IllegalStateException if a parameter already exists.
+ */
+ public void putParameters(final Map<String, String> parameters) {
+ if (parameters != null) {
+ for (Map.Entry<String, String> entry : parameters.entrySet()) {
+ putParameterIfAbsent(entry.getKey(), entry.getValue());
+ }
+
+ updateUnmodifiableParameters();
+ }
+ }
+
+ /**
+ * Put a parameter to the listener event only if the parameter is absent.
+ *
+ * Overridden parameters is not allowed, and an exception may be thrown to avoid a mis-configuration
+ * between listeners setting the same parameters.
+ *
+ * @param name Name of the parameter.
+ * @param value Value of the parameter.
+ * @throws IllegalStateException if a parameter already exists.
+ */
+ private void putParameterIfAbsent(String name, String value) {
+ if (parameters.containsKey(name)) {
+ throw new IllegalStateException("Invalid attempt to overwrite a read-only parameter: " + name);
+ }
+
+ parameters.put(name, value);
+ }
+
+ /**
+ * Keeps a cache of unmodifiable parameters returned by the getParameters() method.
+ */
+ private void updateUnmodifiableParameters() {
+ unmodifiableParameters = Collections.unmodifiableMap(parameters);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
index 1f87eeb..9b8eaf2 100644
--- a/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
+++ b/metastore/src/test/org/apache/hadoop/hive/metastore/TestObjectStore.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FileMetadataExprType;
@@ -37,6 +38,9 @@ import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -116,6 +121,51 @@ public class TestObjectStore {
}
/**
+ * Test notification operations
+ */
+ @Test
+ public void testNotificationOps() throws InterruptedException {
+ final int NO_EVENT_ID = 0;
+ final int FIRST_EVENT_ID = 1;
+ final int SECOND_EVENT_ID = 2;
+
+ NotificationEvent event =
+ new NotificationEvent(0, 0, EventMessage.EventType.CREATE_DATABASE.toString(), "");
+ NotificationEventResponse eventResponse;
+ CurrentNotificationEventId eventId;
+
+ // Verify that there is no notifications available yet
+ eventId = objectStore.getCurrentNotificationEventId();
+ Assert.assertEquals(NO_EVENT_ID, eventId.getEventId());
+
+ // Verify that addNotificationEvent() updates the NotificationEvent with the new event ID
+ objectStore.addNotificationEvent(event);
+ Assert.assertEquals(FIRST_EVENT_ID, event.getEventId());
+ objectStore.addNotificationEvent(event);
+ Assert.assertEquals(SECOND_EVENT_ID, event.getEventId());
+
+ // Verify that objectStore fetches the latest notification event ID
+ eventId = objectStore.getCurrentNotificationEventId();
+ Assert.assertEquals(SECOND_EVENT_ID, eventId.getEventId());
+
+ // Verify that getNextNotification() returns all events
+ eventResponse = objectStore.getNextNotification(new NotificationEventRequest());
+ Assert.assertEquals(2, eventResponse.getEventsSize());
+ Assert.assertEquals(FIRST_EVENT_ID, eventResponse.getEvents().get(0).getEventId());
+ Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(1).getEventId());
+ // Verify that getNextNotification(last) returns events after a specified event
+ eventResponse = objectStore.getNextNotification(new NotificationEventRequest(FIRST_EVENT_ID));
+ Assert.assertEquals(1, eventResponse.getEventsSize());
+ Assert.assertEquals(SECOND_EVENT_ID, eventResponse.getEvents().get(0).getEventId());
+
+ // Verify that cleanNotificationEvents() cleans up all old notifications
+ Thread.sleep(1);
+ objectStore.cleanNotificationEvents(1);
+ eventResponse = objectStore.getNextNotification(new NotificationEventRequest());
+ Assert.assertEquals(0, eventResponse.getEventsSize());
+ }
+
+ /**
* Test database operations
*/
@Test
[2/2] hive git commit: HIVE-16164: Provide mechanism for passing HMS
notification ID between transactional and non-transactional listeners.
(Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)
Posted by sp...@apache.org.
HIVE-16164: Provide mechanism for passing HMS notification ID between transactional and non-transactional listeners. (Sergio Pena, reviewed by Mohit Sabharwal, Alexander Kolbasov)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa29cd9d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa29cd9d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa29cd9d
Branch: refs/heads/master
Commit: aa29cd9d6044edb7d425b99ff347c92079475d1d
Parents: 2985262
Author: Sergio Pena <se...@cloudera.com>
Authored: Tue Apr 4 09:42:06 2017 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Tue Apr 4 09:43:05 2017 -0500
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 46 +-
.../MetaStoreEventListenerConstants.java | 33 ++
.../listener/TestDbNotificationListener.java | 190 +++++++
.../hadoop/hive/metastore/HiveAlterHandler.java | 60 +--
.../hadoop/hive/metastore/HiveMetaStore.java | 529 +++++++++++--------
.../metastore/MetaStoreListenerNotifier.java | 224 ++++++++
.../hive/metastore/events/ListenerEvent.java | 106 ++++
.../hadoop/hive/metastore/TestObjectStore.java | 50 ++
8 files changed, 959 insertions(+), 279 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index ea6cb79..bbfbc36 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
@@ -137,7 +138,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildCreateTableMessage(t, new FileIterator(t.getSd().getLocation())).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, tableEvent);
}
/**
@@ -152,7 +153,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, tableEvent);
}
/**
@@ -168,7 +169,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildAlterTableMessage(before, after).toString());
event.setDbName(after.getDbName());
event.setTableName(after.getTableName());
- process(event);
+ process(event, tableEvent);
}
class FileIterator implements Iterator<String> {
@@ -276,7 +277,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, partitionEvent);
}
/**
@@ -291,7 +292,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- process(event);
+ process(event, partitionEvent);
}
/**
@@ -307,7 +308,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
event.setDbName(before.getDbName());
event.setTableName(before.getTableName());
- process(event);
+ process(event, partitionEvent);
}
/**
@@ -321,7 +322,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
.buildCreateDatabaseMessage(db).toString());
event.setDbName(db.getName());
- process(event);
+ process(event, dbEvent);
}
/**
@@ -335,7 +336,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
.buildDropDatabaseMessage(db).toString());
event.setDbName(db.getName());
- process(event);
+ process(event, dbEvent);
}
/**
@@ -349,7 +350,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
.buildCreateFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- process(event);
+ process(event, fnEvent);
}
/**
@@ -363,7 +364,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
.buildDropFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- process(event);
+ process(event, fnEvent);
}
/**
@@ -377,7 +378,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
.buildCreateIndexMessage(index).toString());
event.setDbName(index.getDbName());
- process(event);
+ process(event, indexEvent);
}
/**
@@ -391,7 +392,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
.buildDropIndexMessage(index).toString());
event.setDbName(index.getDbName());
- process(event);
+ process(event, indexEvent);
}
/**
@@ -406,7 +407,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
.buildAlterIndexMessage(before, after).toString());
event.setDbName(before.getDbName());
- process(event);
+ process(event, indexEvent);
}
class FileChksumIterator implements Iterator<String> {
@@ -443,7 +444,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
.toString());
event.setDbName(insertEvent.getDb());
event.setTableName(insertEvent.getTable());
- process(event);
+ process(event, insertEvent);
}
/**
@@ -467,14 +468,27 @@ public class DbNotificationListener extends MetaStoreEventListener {
return (int)millis;
}
- // Process this notification by adding it to metastore DB
- private void process(NotificationEvent event) throws MetaException {
+ /**
+ * Process this notification by adding it to metastore DB.
+ *
+ * @param event NotificationEvent is the object written to the metastore DB.
+ * @param listenerEvent ListenerEvent (from which NotificationEvent was based) used only to set the
+ * DB_NOTIFICATION_EVENT_ID_KEY_NAME for future reference by other listeners.
+ */
+ private void process(NotificationEvent event, ListenerEvent listenerEvent) throws MetaException {
event.setMessageFormat(msgFactory.getMessageFormat());
synchronized (NOTIFICATION_TBL_LOCK) {
LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
event.getMessage());
HMSHandler.getMSForConf(hiveConf).addNotificationEvent(event);
}
+
+ // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
+ if (event.isSetEventId()) {
+ listenerEvent.putParameter(
+ MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+ Long.toString(event.getEventId()));
+ }
}
private static class CleanerThread extends Thread {
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
new file mode 100644
index 0000000..a4f2d59
--- /dev/null
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/MetaStoreEventListenerConstants.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.hcatalog.listener;
+
+/**
+ * Keeps a list of reserved keys used by Hive listeners when updating the ListenerEvent
+ * parameters.
+ */
+public class MetaStoreEventListenerConstants {
+ /*
+ * DbNotificationListener keys reserved for updating ListenerEvent parameters.
+ *
+ * DB_NOTIFICATION_EVENT_ID_KEY_NAME This key will have the event identifier that DbNotificationListener
+ * processed during an event. This event identifier might be shared
+ * across other MetaStoreEventListener implementations.
+ */
+ public static final String DB_NOTIFICATION_EVENT_ID_KEY_NAME = "DB_NOTIFICATION_EVENT_ID_KEY_NAME";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 1cf47c3..50d8878 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -31,13 +31,16 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Stack;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FireEventRequest;
@@ -46,6 +49,7 @@ import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.FunctionType;
import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -56,6 +60,21 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.InsertEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterIndexMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
@@ -75,6 +94,8 @@ import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.hcatalog.data.Pair;
+import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -97,12 +118,105 @@ public class TestDbNotificationListener {
private int startTime;
private long firstEventId;
+ /* This class is used to verify that HiveMetaStore calls the non-transactional listeners with the
+ * current event ID set by the DbNotificationListener class */
+ public static class MockMetaStoreEventListener extends MetaStoreEventListener {
+ private static Stack<Pair<EventType, String>> eventsIds = new Stack<>();
+
+ private static void pushEventId(EventType eventType, final ListenerEvent event) {
+ if (event.getStatus()) {
+ Map<String, String> parameters = event.getParameters();
+ if (parameters.containsKey(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME)) {
+ Pair<EventType, String> pair =
+ new Pair<>(eventType, parameters.get(MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME));
+ eventsIds.push(pair);
+ }
+ }
+ }
+
+ public static void popAndVerifyLastEventId(EventType eventType, long id) {
+ if (!eventsIds.isEmpty()) {
+ Pair<EventType, String> pair = eventsIds.pop();
+
+ assertEquals("Last event type does not match.", eventType, pair.first);
+ assertEquals("Last event ID does not match.", Long.toString(id), pair.second);
+ } else {
+ assertTrue("List of events is empty.",false);
+ }
+ }
+
+ public static void clearEvents() {
+ eventsIds.clear();
+ }
+
+ public MockMetaStoreEventListener(Configuration config) {
+ super(config);
+ }
+
+ public void onCreateTable (CreateTableEvent tableEvent) throws MetaException {
+ pushEventId(EventType.CREATE_TABLE, tableEvent);
+ }
+
+ public void onDropTable (DropTableEvent tableEvent) throws MetaException {
+ pushEventId(EventType.DROP_TABLE, tableEvent);
+ }
+
+ public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
+ pushEventId(EventType.ALTER_TABLE, tableEvent);
+ }
+
+ public void onAddPartition (AddPartitionEvent partitionEvent) throws MetaException {
+ pushEventId(EventType.ADD_PARTITION, partitionEvent);
+ }
+
+ public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaException {
+ pushEventId(EventType.DROP_PARTITION, partitionEvent);
+ }
+
+ public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException {
+ pushEventId(EventType.ALTER_PARTITION, partitionEvent);
+ }
+
+ public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException {
+ pushEventId(EventType.CREATE_DATABASE, dbEvent);
+ }
+
+ public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+ pushEventId(EventType.DROP_DATABASE, dbEvent);
+ }
+
+ public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
+ pushEventId(EventType.CREATE_INDEX, indexEvent);
+ }
+
+ public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
+ pushEventId(EventType.DROP_INDEX, indexEvent);
+ }
+
+ public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
+ pushEventId(EventType.ALTER_INDEX, indexEvent);
+ }
+
+ public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+ pushEventId(EventType.CREATE_FUNCTION, fnEvent);
+ }
+
+ public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+ pushEventId(EventType.DROP_FUNCTION, fnEvent);
+ }
+
+ public void onInsert(InsertEvent insertEvent) throws MetaException {
+ pushEventId(EventType.INSERT, insertEvent);
+ }
+ }
+
@SuppressWarnings("rawtypes")
@BeforeClass
public static void connectToMetastore() throws Exception {
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
DbNotificationListener.class.getName());
+ conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, MockMetaStoreEventListener.class.getName());
conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s");
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
@@ -139,6 +253,12 @@ public class TestDbNotificationListener {
DummyRawStoreFailEvent.setEventSucceed(true);
}
+ @After
+ public void tearDown() {
+ MockMetaStoreEventListener.clearEvents();
+ }
+
+
@Test
public void createDatabase() throws Exception {
String dbName = "createdb";
@@ -164,6 +284,9 @@ public class TestDbNotificationListener {
CreateDatabaseMessage createDbMsg = md.getCreateDatabaseMessage(event.getMessage());
assertEquals(dbName, createDbMsg.getDB());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
@@ -206,6 +329,10 @@ public class TestDbNotificationListener {
DropDatabaseMessage dropDbMsg = md.getDropDatabaseMessage(event.getMessage());
assertEquals(dbName, dropDbMsg.getDB());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_DATABASE, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_DATABASE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
@@ -256,6 +383,9 @@ public class TestDbNotificationListener {
assertEquals(tblName, createTblMsg.getTable());
assertEquals(table, createTblMsg.getTableObj());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
table =
@@ -312,6 +442,9 @@ public class TestDbNotificationListener {
AlterTableMessage alterTableMessage = md.getAlterTableMessage(event.getMessage());
assertEquals(table, alterTableMessage.getTableObjAfter());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
@@ -363,6 +496,10 @@ public class TestDbNotificationListener {
assertEquals(defaultDbName, dropTblMsg.getDB());
assertEquals(tblName, dropTblMsg.getTable());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_TABLE, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
table =
@@ -428,6 +565,10 @@ public class TestDbNotificationListener {
assertTrue(ptnIter.hasNext());
assertEquals(partition, ptnIter.next());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
partition =
@@ -494,6 +635,10 @@ public class TestDbNotificationListener {
assertEquals(tblName, alterPtnMsg.getTable());
assertEquals(newPart, alterPtnMsg.getPtnObjAfter());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
@@ -557,6 +702,11 @@ public class TestDbNotificationListener {
assertEquals(table.getTableName(), tableObj.getTableName());
assertEquals(table.getOwner(), tableObj.getOwner());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, firstEventId + 3);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
List<String> newpartCol1Vals = Arrays.asList("tomorrow");
@@ -653,6 +803,13 @@ public class TestDbNotificationListener {
Iterator<Map<String, String>> parts = dropPtnMsg.getPartitions().iterator();
assertTrue(parts.hasNext());
assertEquals(part1.getValues(), Lists.newArrayList(parts.next().values()));
+
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_PARTITION, firstEventId + 5);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 4);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
}
@Test
@@ -693,6 +850,9 @@ public class TestDbNotificationListener {
assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
@@ -742,6 +902,10 @@ public class TestDbNotificationListener {
assertEquals(defaultDbName, dropFuncMsg.getDB());
assertEquals(funcName, dropFuncMsg.getFunctionName());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_FUNCTION, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_FUNCTION, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
func =
@@ -807,6 +971,11 @@ public class TestDbNotificationListener {
assertEquals(tableName, indexObj.getOrigTableName());
assertEquals(indexTableName, indexObj.getIndexTableName());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
@@ -873,6 +1042,12 @@ public class TestDbNotificationListener {
assertEquals(indexTableName.toLowerCase(), dropIdxMsg.getIndexTableName());
assertEquals(tableName.toLowerCase(), dropIdxMsg.getOrigTableName());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.DROP_INDEX, firstEventId + 4);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
index =
@@ -947,6 +1122,12 @@ public class TestDbNotificationListener {
assertEquals(indexTableName, indexObj.getIndexTableName());
assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime());
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ALTER_INDEX, firstEventId + 4);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_INDEX, firstEventId + 3);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+
// When hive.metastore.transactional.event.listeners is set,
// a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
@@ -1003,6 +1184,10 @@ public class TestDbNotificationListener {
assertEquals(tblName, event.getTableName());
// Parse the message field
verifyInsert(event, defaultDbName, tblName);
+
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
}
@Test
@@ -1063,6 +1248,11 @@ public class TestDbNotificationListener {
Map<String,String> partKeyValsFromNotif = insertMessage.getPartitionKeyValues();
assertMapEquals(partKeyVals, partKeyValsFromNotif);
+
+ // Verify the eventID was passed to the non-transactional listener
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
+ MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
index 4ce6a65..d0511ad 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -268,12 +269,11 @@ public class HiveAlterHandler implements AlterHandler {
}
alterTableUpdateTableColumnStats(msdb, oldt, newt);
- if (transactionalListeners != null && transactionalListeners.size() > 0) {
- AlterTableEvent alterTableEvent = new AlterTableEvent(oldt, newt, true, handler);
- alterTableEvent.setEnvironmentContext(environmentContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAlterTable(alterTableEvent);
- }
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventMessage.EventType.ALTER_TABLE,
+ new AlterTableEvent(oldt, newt, true, handler),
+ environmentContext);
}
// commit the changes
success = msdb.commitTransaction();
@@ -381,13 +381,13 @@ public class HiveAlterHandler implements AlterHandler {
updatePartColumnStats(msdb, dbname, name, new_part.getValues(), new_part);
msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
- if (transactionalListeners != null && transactionalListeners.size() > 0) {
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(oldPart, new_part, tbl, true, handler);
- alterPartitionEvent.setEnvironmentContext(environmentContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAlterPartition(alterPartitionEvent);
- }
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventMessage.EventType.ALTER_PARTITION,
+ new AlterPartitionEvent(oldPart, new_part, tbl, true, handler),
+ environmentContext);
+
+
}
success = msdb.commitTransaction();
} catch (InvalidObjectException e) {
@@ -499,13 +499,11 @@ public class HiveAlterHandler implements AlterHandler {
}
}
- if (transactionalListeners != null && transactionalListeners.size() > 0) {
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(oldPart, new_part, tbl, true, handler);
- alterPartitionEvent.setEnvironmentContext(environmentContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAlterPartition(alterPartitionEvent);
- }
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventMessage.EventType.ALTER_PARTITION,
+ new AlterPartitionEvent(oldPart, new_part, tbl, true, handler),
+ environmentContext);
}
success = msdb.commitTransaction();
@@ -534,13 +532,11 @@ public class HiveAlterHandler implements AlterHandler {
try {
msdb.openTransaction();
msdb.alterPartition(dbname, name, new_part.getValues(), oldPart);
- if (transactionalListeners != null && transactionalListeners.size() > 0) {
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(new_part, oldPart, tbl, true, handler);
- alterPartitionEvent.setEnvironmentContext(environmentContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAlterPartition(alterPartitionEvent);
- }
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventMessage.EventType.ALTER_PARTITION,
+ new AlterPartitionEvent(new_part, oldPart, tbl, success, handler),
+ environmentContext);
}
revertMetaDataTransaction = msdb.commitTransaction();
@@ -625,12 +621,10 @@ public class HiveAlterHandler implements AlterHandler {
"when invoking MetaStoreEventListener for alterPartitions event.");
}
- if (transactionalListeners != null && transactionalListeners.size() > 0) {
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(oldPart, newPart, tbl, true, handler);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAlterPartition(alterPartitionEvent);
- }
+ if (transactionalListeners != null && !transactionalListeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventMessage.EventType.ALTER_PARTITION,
+ new AlterPartitionEvent(oldPart, newPart, tbl, true, handler));
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aa29cd9d/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 80b1e98..3aabe22 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -57,8 +57,11 @@ import java.util.regex.Pattern;
import javax.jdo.JDOException;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimaps;
import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -112,6 +115,7 @@ import org.apache.hadoop.hive.metastore.events.PreLoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -151,10 +155,6 @@ import com.facebook.fb303.fb_status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -869,6 +869,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path dbPath = new Path(db.getLocationUri());
boolean success = false;
boolean madeDir = false;
+ Map<String, String> transactionalListenersResponses = Collections.emptyMap();
try {
firePreEvent(new PreCreateDatabaseEvent(db, this));
if (!wh.isDir(dbPath)) {
@@ -881,11 +882,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.openTransaction();
ms.createDatabase(db);
- if (transactionalListeners.size() > 0) {
- CreateDatabaseEvent cde = new CreateDatabaseEvent(db, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onCreateDatabase(cde);
- }
+
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenersResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.CREATE_DATABASE,
+ new CreateDatabaseEvent(db, true, this));
}
success = ms.commitTransaction();
@@ -896,8 +898,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(dbPath, true);
}
}
- for (MetaStoreEventListener listener : listeners) {
- listener.onCreateDatabase(new CreateDatabaseEvent(db, success, this));
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.CREATE_DATABASE,
+ new CreateDatabaseEvent(db, success, this),
+ null,
+ transactionalListenersResponses);
}
}
}
@@ -1012,6 +1019,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Database db = null;
List<Path> tablePaths = new ArrayList<Path>();
List<Path> partitionPaths = new ArrayList<Path>();
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
db = ms.getDatabase(name);
@@ -1094,12 +1102,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (ms.dropDatabase(name)) {
- if (transactionalListeners.size() > 0) {
- DropDatabaseEvent dde = new DropDatabaseEvent(db, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onDropDatabase(dde);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_DATABASE,
+ new DropDatabaseEvent(db, true, this));
}
+
success = ms.commitTransaction();
}
} finally {
@@ -1121,8 +1130,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
// it is not a terrible thing even if the data is not deleted
}
- for (MetaStoreEventListener listener : listeners) {
- listener.onDropDatabase(new DropDatabaseEvent(db, success, this));
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_DATABASE,
+ new DropDatabaseEvent(db, success, this),
+ null,
+ transactionalListenerResponses);
}
}
}
@@ -1380,6 +1394,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
Path tblPath = null;
boolean success = false, madeDir = false;
try {
@@ -1440,12 +1455,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.createTableWithConstraints(tbl, primaryKeys, foreignKeys);
}
- if (transactionalListeners.size() > 0) {
- CreateTableEvent createTableEvent = new CreateTableEvent(tbl, true, this);
- createTableEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onCreateTable(createTableEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.CREATE_TABLE,
+ new CreateTableEvent(tbl, true, this),
+ envContext);
}
success = ms.commitTransaction();
@@ -1456,11 +1471,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(tblPath, true);
}
}
- for (MetaStoreEventListener listener : listeners) {
- CreateTableEvent createTableEvent =
- new CreateTableEvent(tbl, success, this);
- createTableEvent.setEnvironmentContext(envContext);
- listener.onCreateTable(createTableEvent);
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.CREATE_TABLE,
+ new CreateTableEvent(tbl, success, this),
+ envContext,
+ transactionalListenerResponses);
}
}
}
@@ -1625,6 +1642,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<Path> partPaths = null;
Table tbl = null;
boolean ifPurge = false;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
// drop any partitions
@@ -1678,12 +1696,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
throw new MetaException(indexName == null ? "Unable to drop table " + tableName:
"Unable to drop index table " + tableName + " for index " + indexName);
} else {
- if (transactionalListeners.size() > 0) {
- DropTableEvent dropTableEvent = new DropTableEvent(tbl, true, deleteData, this);
- dropTableEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onDropTable(dropTableEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_TABLE,
+ new DropTableEvent(tbl, deleteData, true, this),
+ envContext);
}
success = ms.commitTransaction();
}
@@ -1698,10 +1716,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
deleteTableData(tblPath, ifPurge);
// ok even if the data is not deleted
}
- for (MetaStoreEventListener listener : listeners) {
- DropTableEvent dropTableEvent = new DropTableEvent(tbl, success, deleteData, this);
- dropTableEvent.setEnvironmentContext(envContext);
- listener.onDropTable(dropTableEvent);
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_TABLE,
+ new DropTableEvent(tbl, deleteData, success, this),
+ envContext,
+ transactionalListenerResponses);
}
}
return success;
@@ -2165,6 +2186,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false, madeDir = false;
Path partLocation = null;
Table tbl = null;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
part.setDbName(dbName);
@@ -2221,12 +2243,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (ms.addPartition(part)) {
- if (transactionalListeners.size() > 0) {
- AddPartitionEvent addPartitionEvent = new AddPartitionEvent(tbl, part, true, this);
- addPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAddPartition(addPartitionEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, part, true, this),
+ envContext);
}
success = ms.commitTransaction();
@@ -2239,11 +2261,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- for (MetaStoreEventListener listener : listeners) {
- AddPartitionEvent addPartitionEvent =
- new AddPartitionEvent(tbl, part, success, this);
- addPartitionEvent.setEnvironmentContext(envContext);
- listener.onAddPartition(addPartitionEvent);
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, part, success, this),
+ envContext,
+ transactionalListenerResponses);
}
}
return part;
@@ -2388,8 +2411,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
final Map<PartValEqWrapper, Boolean> addedPartitions =
Collections.synchronizedMap(new HashMap<PartValEqWrapper, Boolean>());
final List<Partition> newParts = new ArrayList<Partition>();
- final List<Partition> existingParts = new ArrayList<Partition>();;
+ final List<Partition> existingParts = new ArrayList<Partition>();
Table tbl = null;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+
try {
ms.openTransaction();
tbl = ms.getTable(dbName, tblName);
@@ -2475,7 +2500,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
success = false;
// Notification is generated for newly created partitions only. The subset of partitions
// that already exist (existingParts), will not generate notifications.
- fireMetaStoreAddPartitionEventTransactional(tbl, newParts, null, true);
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, newParts, true, this));
+ }
+
success = ms.commitTransaction();
} finally {
if (!success) {
@@ -2486,12 +2517,26 @@ public class HiveMetaStore extends ThriftHiveMetastore {
wh.deleteDir(new Path(e.getKey().partition.getSd().getLocation()), true);
}
}
- fireMetaStoreAddPartitionEvent(tbl, parts, null, false);
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, parts, false, this));
+ }
} else {
- fireMetaStoreAddPartitionEvent(tbl, newParts, null, true);
- if (existingParts != null) {
- // The request has succeeded but we failed to add these partitions.
- fireMetaStoreAddPartitionEvent(tbl, existingParts, null, false);
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, newParts, true, this),
+ null,
+ transactionalListenerResponses);
+
+ if (!existingParts.isEmpty()) {
+ // The request has succeeded but we failed to add these partitions.
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, existingParts, false, this));
+ }
}
}
}
@@ -2578,6 +2623,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
final PartitionSpecProxy.PartitionIterator partitionIterator = partitionSpecProxy
.getPartitionIterator();
Table tbl = null;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
tbl = ms.getTable(dbName, tblName);
@@ -2651,7 +2697,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
success = ms.addPartitions(dbName, tblName, partitionSpecProxy, ifNotExists);
//setting success to false to make sure that if the listener fails, rollback happens.
success = false;
- fireMetaStoreAddPartitionEventTransactional(tbl, partitionSpecProxy, null, true);
+
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, partitionSpecProxy, true, this));
+ }
+
success = ms.commitTransaction();
return addedPartitions.size();
} finally {
@@ -2664,7 +2717,14 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
}
- fireMetaStoreAddPartitionEvent(tbl, partitionSpecProxy, null, true);
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, partitionSpecProxy, true, this),
+ null,
+ transactionalListenerResponses);
+ }
}
}
@@ -2769,6 +2829,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
boolean success = false;
Table tbl = null;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
tbl = ms.getTable(part.getDbName(), part.getTableName());
@@ -2793,7 +2854,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Setting success to false to make sure that if the listener fails, rollback happens.
success = false;
- fireMetaStoreAddPartitionEventTransactional(tbl, Arrays.asList(part), envContext, true);
+
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, Arrays.asList(part), true, this),
+ envContext);
+
+ }
+
// we proceed only if we'd actually succeeded anyway, otherwise,
// we'd have thrown an exception
success = ms.commitTransaction();
@@ -2801,64 +2871,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!success) {
ms.rollbackTransaction();
}
- fireMetaStoreAddPartitionEvent(tbl, Arrays.asList(part), envContext, success);
- }
- return part;
- }
-
- private void fireMetaStoreAddPartitionEvent(final Table tbl,
- final List<Partition> parts, final EnvironmentContext envContext, boolean success)
- throws MetaException {
- if (tbl != null && parts != null && !parts.isEmpty()) {
- AddPartitionEvent addPartitionEvent =
- new AddPartitionEvent(tbl, parts, success, this);
- addPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener listener : listeners) {
- listener.onAddPartition(addPartitionEvent);
- }
- }
- }
- private void fireMetaStoreAddPartitionEvent(final Table tbl,
- final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success)
- throws MetaException {
- if (tbl != null && partitionSpec != null) {
- AddPartitionEvent addPartitionEvent =
- new AddPartitionEvent(tbl, partitionSpec, success, this);
- addPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener listener : listeners) {
- listener.onAddPartition(addPartitionEvent);
- }
- }
- }
-
- private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
- final List<Partition> parts, final EnvironmentContext envContext, boolean success)
- throws MetaException {
- if (tbl != null && parts != null && !parts.isEmpty()) {
- AddPartitionEvent addPartitionEvent =
- new AddPartitionEvent(tbl, parts, success, this);
- addPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAddPartition(addPartitionEvent);
- }
- }
- }
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(tbl, Arrays.asList(part), success, this),
+ envContext,
+ transactionalListenerResponses);
- private void fireMetaStoreAddPartitionEventTransactional(final Table tbl,
- final PartitionSpecProxy partitionSpec, final EnvironmentContext envContext, boolean success)
- throws MetaException {
- if (tbl != null && partitionSpec != null) {
- AddPartitionEvent addPartitionEvent =
- new AddPartitionEvent(tbl, partitionSpec, success, this);
- addPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAddPartition(addPartitionEvent);
}
}
+ return part;
}
-
@Override
public Partition add_partition(final Partition part)
throws InvalidObjectException, AlreadyExistsException, MetaException {
@@ -2941,6 +2966,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path destPath = new Path(destinationTable.getSd().getLocation(),
Warehouse.makePartName(partitionKeysPresent, partValsPresent));
List<Partition> destPartitions = new ArrayList<Partition>();
+
+ Map<String, String> transactionalListenerResponsesForAddPartition = Collections.emptyMap();
+ List<Map<String, String>> transactionalListenerResponsesForDropPartition =
+ Lists.newArrayListWithCapacity(partitionsToExchange.size());
+
try {
for (Partition partition: partitionsToExchange) {
Partition destPartition = new Partition(partition);
@@ -2968,8 +2998,22 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Setting success to false to make sure that if the listener fails, rollback happens.
success = false;
- fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
- destinationTable, destPartitions, transactionalListeners, true);
+
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponsesForAddPartition =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.ADD_PARTITION,
+ new AddPartitionEvent(destinationTable, destPartitions, true, this));
+
+ for (Partition partition : partitionsToExchange) {
+ DropPartitionEvent dropPartitionEvent =
+ new DropPartitionEvent(sourceTable, partition, true, true, this);
+ transactionalListenerResponsesForDropPartition.add(
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_PARTITION,
+ dropPartitionEvent));
+ }
+ }
success = ms.commitTransaction();
return destPartitions;
@@ -2979,34 +3023,31 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (pathCreated) {
wh.renameDir(destPath, sourcePath);
}
-
- fireMetaStoreExchangePartitionEvent(sourceTable, partitionsToExchange,
- destinationTable, destPartitions, listeners, success);
}
- }
- }
- private void fireMetaStoreExchangePartitionEvent(Table sourceTable,
- List<Partition> partitionsToExchange, Table destinationTable,
- List<Partition> destPartitions,
- List<MetaStoreEventListener> eventListeners,
- boolean status) throws MetaException {
- if (sourceTable != null && destinationTable != null
- && !CollectionUtils.isEmpty(partitionsToExchange)
- && !CollectionUtils.isEmpty(destPartitions)) {
- if (eventListeners.size() > 0) {
- AddPartitionEvent addPartitionEvent =
- new AddPartitionEvent(destinationTable, destPartitions, status, this);
- for (MetaStoreEventListener eventListener : eventListeners) {
- eventListener.onAddPartition(addPartitionEvent);
- }
+ if (!listeners.isEmpty()) {
+ AddPartitionEvent addPartitionEvent = new AddPartitionEvent(destinationTable, destPartitions, success, this);
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ADD_PARTITION,
+ addPartitionEvent,
+ null,
+ transactionalListenerResponsesForAddPartition);
+ i = 0;
for (Partition partition : partitionsToExchange) {
DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(sourceTable, partition, true, status, this);
- for (MetaStoreEventListener eventListener : eventListeners) {
- eventListener.onDropPartition(dropPartitionEvent);
- }
+ new DropPartitionEvent(sourceTable, partition, success, true, this);
+ Map<String, String> parameters =
+ (transactionalListenerResponsesForDropPartition.size() > i)
+ ? transactionalListenerResponsesForDropPartition.get(i)
+ : null;
+
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_PARTITION,
+ dropPartitionEvent,
+ null,
+ parameters);
+ i++;
}
}
}
@@ -3024,6 +3065,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Path archiveParentDir = null;
boolean mustPurge = false;
boolean isExternalTbl = false;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
@@ -3056,13 +3098,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
if (!ms.dropPartition(db_name, tbl_name, part_vals)) {
throw new MetaException("Unable to drop partition");
} else {
- if (transactionalListeners.size() > 0) {
- DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(tbl, part, true, deleteData, this);
- dropPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onDropPartition(dropPartitionEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_PARTITION,
+ new DropPartitionEvent(tbl, part, true, deleteData, this),
+ envContext);
}
success = ms.commitTransaction();
}
@@ -3090,11 +3132,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// ok even if the data is not deleted
}
}
- for (MetaStoreEventListener listener : listeners) {
- DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(tbl, part, success, deleteData, this);
- dropPartitionEvent.setEnvironmentContext(envContext);
- listener.onDropPartition(dropPartitionEvent);
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_PARTITION,
+ new DropPartitionEvent(tbl, part, success, deleteData, this),
+ envContext,
+ transactionalListenerResponses);
}
}
return true;
@@ -3156,6 +3199,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
List<Partition> parts = null;
boolean mustPurge = false;
boolean isExternalTbl = false;
+ List<Map<String, String>> transactionalListenerResponses = Lists.newArrayList();
+
try {
// We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
// Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes.
@@ -3239,14 +3284,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
ms.dropPartitions(dbName, tblName, partNames);
- if (parts != null && transactionalListeners.size() > 0) {
+ if (parts != null && !transactionalListeners.isEmpty()) {
for (Partition part : parts) {
- DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(tbl, part, true, deleteData, this);
- dropPartitionEvent.setEnvironmentContext(envContext);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onDropPartition(dropPartitionEvent);
- }
+ transactionalListenerResponses.add(
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_PARTITION,
+ new DropPartitionEvent(tbl, part, true, deleteData, this),
+ envContext));
}
}
@@ -3280,12 +3324,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
if (parts != null) {
- for (Partition part : parts) {
- for (MetaStoreEventListener listener : listeners) {
- DropPartitionEvent dropPartitionEvent =
- new DropPartitionEvent(tbl, part, success, deleteData, this);
- dropPartitionEvent.setEnvironmentContext(envContext);
- listener.onDropPartition(dropPartitionEvent);
+ int i = 0;
+ if (parts != null && !listeners.isEmpty()) {
+ for (Partition part : parts) {
+ Map<String, String> parameters =
+ (!transactionalListenerResponses.isEmpty()) ? transactionalListenerResponses.get(i) : null;
+
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_PARTITION,
+ new DropPartitionEvent(tbl, part, success, deleteData, this),
+ envContext,
+ parameters);
+
+ i++;
}
}
}
@@ -3720,14 +3771,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// Only fetch the table if we actually have a listener
Table table = null;
- for (MetaStoreEventListener listener : listeners) {
+ if (!listeners.isEmpty()) {
if (table == null) {
table = getMS().getTable(db_name, tbl_name);
}
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(oldPart, new_part, table, true, this);
- alterPartitionEvent.setEnvironmentContext(envContext);
- listener.onAlterPartition(alterPartitionEvent);
+
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ALTER_PARTITION,
+ new AlterPartitionEvent(oldPart, new_part, table, true, this),
+ envContext);
}
} catch (InvalidObjectException e) {
ex = e;
@@ -3791,13 +3843,15 @@ public class HiveMetaStore extends ThriftHiveMetastore {
else {
throw new InvalidOperationException("failed to alterpartitions");
}
- for (MetaStoreEventListener listener : listeners) {
- if (table == null) {
- table = getMS().getTable(db_name, tbl_name);
- }
- AlterPartitionEvent alterPartitionEvent =
- new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this);
- listener.onAlterPartition(alterPartitionEvent);
+
+ if (table == null) {
+ table = getMS().getTable(db_name, tbl_name);
+ }
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ALTER_PARTITION,
+ new AlterPartitionEvent(oldTmpPart, tmpPart, table, true, this));
}
}
} catch (InvalidObjectException e) {
@@ -3834,16 +3888,17 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Exception ex = null;
Index oldIndex = null;
RawStore ms = getMS();
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
oldIndex = get_index_by_name(dbname, base_table_name, index_name);
firePreEvent(new PreAlterIndexEvent(oldIndex, newIndex, this));
ms.alterIndex(dbname, base_table_name, index_name, newIndex);
- if (transactionalListeners.size() > 0) {
- AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAlterIndex(alterIndexEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.ALTER_INDEX,
+ new AlterIndexEvent(oldIndex, newIndex, true, this));
}
success = ms.commitTransaction();
@@ -3865,9 +3920,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
endFunction("alter_index", success, ex, base_table_name);
- for (MetaStoreEventListener listener : listeners) {
- AlterIndexEvent alterIndexEvent = new AlterIndexEvent(oldIndex, newIndex, success, this);
- listener.onAlterIndex(alterIndexEvent);
+
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ALTER_INDEX,
+ new AlterIndexEvent(oldIndex, newIndex, success, this),
+ null,
+ transactionalListenerResponses);
}
}
}
@@ -3935,11 +3994,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
alterHandler.alterTable(getMS(), wh, dbname, name, newTable,
envContext, this);
success = true;
- for (MetaStoreEventListener listener : listeners) {
- AlterTableEvent alterTableEvent =
- new AlterTableEvent(oldt, newTable, success, this);
- alterTableEvent.setEnvironmentContext(envContext);
- listener.onAlterTable(alterTableEvent);
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.ALTER_TABLE,
+ new AlterTableEvent(oldt, newTable, true, this),
+ envContext);
}
} catch (NoSuchObjectException e) {
// thrown when the table to be altered does not exist
@@ -4506,6 +4565,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false, indexTableCreated = false;
String[] qualified =
MetaStoreUtils.getQualifiedName(index.getDbName(), index.getIndexTableName());
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
firePreEvent(new PreAddIndexEvent(index, this));
@@ -4543,11 +4603,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
index.setCreateTime((int) time);
index.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
if (ms.addIndex(index)) {
- if (transactionalListeners.size() > 0) {
- AddIndexEvent addIndexEvent = new AddIndexEvent(index, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onAddIndex(addIndexEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.CREATE_INDEX,
+ new AddIndexEvent(index, true, this));
}
}
@@ -4564,9 +4624,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.rollbackTransaction();
}
- for (MetaStoreEventListener listener : listeners) {
- AddIndexEvent addIndexEvent = new AddIndexEvent(index, success, this);
- listener.onAddIndex(addIndexEvent);
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.CREATE_INDEX,
+ new AddIndexEvent(index, success, this),
+ null,
+ transactionalListenerResponses);
}
}
}
@@ -4604,6 +4667,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
Index index = null;
Path tblPath = null;
List<Path> partPaths = null;
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
// drop the underlying index table
@@ -4636,11 +4700,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- if (transactionalListeners.size() > 0) {
- DropIndexEvent dropIndexEvent = new DropIndexEvent(index, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onDropIndex(dropIndexEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_INDEX,
+ new DropIndexEvent(index, true, this));
}
success = ms.commitTransaction();
@@ -4653,11 +4717,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
// ok even if the data is not deleted
}
// Skip the event listeners if the index is NULL
- if (index != null) {
- for (MetaStoreEventListener listener : listeners) {
- DropIndexEvent dropIndexEvent = new DropIndexEvent(index, success, this);
- listener.onDropIndex(dropIndexEvent);
- }
+ if (index != null && !listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_INDEX,
+ new DropIndexEvent(index, success, this),
+ null,
+ transactionalListenerResponses);
}
}
return success;
@@ -6093,6 +6158,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
validateFunctionInfo(func);
boolean success = false;
RawStore ms = getMS();
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
Database db = ms.getDatabase(func.getDbName());
@@ -6109,11 +6175,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
long time = System.currentTimeMillis() / 1000;
func.setCreateTime((int) time);
ms.createFunction(func);
- if (transactionalListeners.size() > 0) {
- CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onCreateFunction(createFunctionEvent);
- }
+ if (!transactionalListeners.isEmpty()) {
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.CREATE_FUNCTION,
+ new CreateFunctionEvent(func, true, this));
}
success = ms.commitTransaction();
@@ -6122,11 +6188,12 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.rollbackTransaction();
}
- if (listeners.size() > 0) {
- CreateFunctionEvent createFunctionEvent = new CreateFunctionEvent(func, success, this);
- for (MetaStoreEventListener listener : listeners) {
- listener.onCreateFunction(createFunctionEvent);
- }
+ if (!listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.CREATE_FUNCTION,
+ new CreateFunctionEvent(func, success, this),
+ null,
+ transactionalListenerResponses);
}
}
}
@@ -6138,6 +6205,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
boolean success = false;
Function func = null;
RawStore ms = getMS();
+ Map<String, String> transactionalListenerResponses = Collections.emptyMap();
try {
ms.openTransaction();
func = ms.getFunction(dbName, funcName);
@@ -6147,10 +6215,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
ms.dropFunction(dbName, funcName);
if (transactionalListeners.size() > 0) {
- DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, true, this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onDropFunction(dropFunctionEvent);
- }
+ transactionalListenerResponses =
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
+ EventType.DROP_FUNCTION,
+ new DropFunctionEvent(func, true, this));
}
success = ms.commitTransaction();
@@ -6160,10 +6228,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
if (listeners.size() > 0) {
- DropFunctionEvent dropFunctionEvent = new DropFunctionEvent(func, success, this);
- for (MetaStoreEventListener listener : listeners) {
- listener.onDropFunction(dropFunctionEvent);
- }
+ MetaStoreListenerNotifier.notifyEvent(listeners,
+ EventType.DROP_FUNCTION,
+ new DropFunctionEvent(func, success, this),
+ null,
+ transactionalListenerResponses);
}
}
}
@@ -6530,13 +6599,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
InsertEvent event =
new InsertEvent(rqst.getDbName(), rqst.getTableName(), rqst.getPartitionVals(), rqst
.getData().getInsertData(), rqst.isSuccessful(), this);
- for (MetaStoreEventListener transactionalListener : transactionalListeners) {
- transactionalListener.onInsert(event);
- }
- for (MetaStoreEventListener listener : listeners) {
- listener.onInsert(event);
- }
+ /*
+ * The transactional listener response will be set already on the event, so there is not need
+ * to pass the response to the non-transactional listener.
+ */
+ MetaStoreListenerNotifier.notifyEvent(transactionalListeners, EventType.INSERT, event);
+ MetaStoreListenerNotifier.notifyEvent(listeners, EventType.INSERT, event);
return new FireEventResponse();