You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2015/01/21 00:50:55 UTC
svn commit: r1653408 - in /hive/trunk:
hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/
itests/hcatalog-unit/
itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/
Author: khorgath
Date: Tue Jan 20 23:50:55 2015
New Revision: 1653408
URL: http://svn.apache.org/r1653408
Log:
HIVE-9184 : Modify HCatClient to support new notification methods in HiveMetaStoreClient (Alan Gates via Sushanth Sowmyan)
Added:
hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/
hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
Modified:
hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
hive/trunk/itests/hcatalog-unit/pom.xml
Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java?rev=1653408&r1=1653407&r2=1653408&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java Tue Jan 20 23:50:55 2015
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hive.hcatalog.common.HCatException;
@@ -467,6 +468,31 @@ public abstract class HCatClient {
public abstract String getMessageBusTopicName(String dbName, String tableName) throws HCatException;
/**
+ * Get a list of notifications
+ * @param lastEventId The last event id that was consumed by this reader. The returned
+ * notifications will start at the next eventId available this eventId that
+ * matches the filter.
+ * @param maxEvents Maximum number of events to return. If < 1, then all available events will
+ * be returned.
+ * @param filter Filter to determine if message should be accepted. If null, then all
+ * available events up to maxEvents will be returned.
+ * @return list of notifications, sorted by eventId. It is guaranteed that the events are in
+ * the order that the operations were done on the database.
+ * @throws HCatException
+ */
+ public abstract List<HCatNotificationEvent> getNextNotification(long lastEventId,
+ int maxEvents,
+ IMetaStoreClient.NotificationFilter filter)
+ throws HCatException;
+
+ /**
+ * Get the most recently used notification id.
+ * @return
+ * @throws HCatException
+ */
+ public abstract long getCurrentNotificationEventId() throws HCatException;
+
+ /**
* Close the hcatalog client.
*
* @throws HCatException
Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java?rev=1653408&r1=1653407&r2=1653408&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java Tue Jan 20 23:50:55 2015
@@ -23,20 +23,25 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -825,7 +830,8 @@ public class HCatClientHMSImpl extends H
@Override
public String getMessageBusTopicName(String dbName, String tableName) throws HCatException {
try {
- return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+ return hmsClient.getTable(dbName, tableName).getParameters().get(
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
}
catch (MetaException e) {
throw new HCatException("MetaException while retrieving JMS Topic name.", e);
@@ -836,7 +842,36 @@ public class HCatClientHMSImpl extends H
"TException while retrieving JMS Topic name.", e);
}
}
-
+
+ @Override
+ public List<HCatNotificationEvent> getNextNotification(long lastEventId, int maxEvents,
+ IMetaStoreClient.NotificationFilter filter)
+ throws HCatException {
+ try {
+ List<HCatNotificationEvent> events = new ArrayList<HCatNotificationEvent>();
+ NotificationEventResponse rsp = hmsClient.getNextNotification(lastEventId, maxEvents, filter);
+ if (rsp != null && rsp.getEvents() != null) {
+ for (NotificationEvent event : rsp.getEvents()) {
+ events.add(new HCatNotificationEvent(event));
+ }
+ }
+ return events;
+ } catch (TException e) {
+ throw new ConnectionFailureException("TException while getting notifications", e);
+ }
+ }
+
+ @Override
+ public long getCurrentNotificationEventId() throws HCatException {
+ try {
+ CurrentNotificationEventId id = hmsClient.getCurrentNotificationEventId();
+ return id.getEventId();
+ } catch (TException e) {
+ throw new ConnectionFailureException("TException while getting current notification event " +
+ "id " , e);
+ }
+ }
+
@Override
public String serializeTable(HCatTable hcatTable) throws HCatException {
return MetadataSerializer.get().serializeTable(hcatTable);
@@ -905,8 +940,10 @@ public class HCatClientHMSImpl extends H
@Override
public HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException {
- HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get().deserializePartitionSpec(hcatPartitionSpecStrings);
- hcatPartitionSpec.hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName()));
+ HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get()
+ .deserializePartitionSpec(hcatPartitionSpecStrings);
+ hcatPartitionSpec
+ .hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName()));
return hcatPartitionSpec;
}
}
Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java?rev=1653408&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java Tue Jan 20 23:50:55 2015
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+/**
+ * A wrapper class for {@link org.apache.hadoop.hive.metastore.api.NotificationEvent},
+ * so that if that class changes we can still keep this one constant for backward compatibility
+ */
+public class HCatNotificationEvent {
+ private long eventId;
+ private int eventTime;
+ private String eventType;
+ private String dbName;
+ private String tableName;
+ private String message;
+
+ HCatNotificationEvent(NotificationEvent event) {
+ eventId = event.getEventId();
+ eventTime = event.getEventTime();
+ eventType = event.getEventType();
+ dbName = event.getDbName();
+ tableName = event.getTableName();
+ message = event.getMessage();
+ }
+
+ public long getEventId() {
+ return eventId;
+ }
+
+ public int getEventTime() {
+ return eventTime;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append("eventId:");
+ buf.append(eventId);
+ buf.append(" eventTime:");
+ buf.append(eventTime);
+ buf.append(" eventType:<");
+ buf.append(eventType);
+ buf.append("> dbName:<");
+ buf.append(dbName);
+ buf.append("> tableName:<");
+ buf.append(tableName);
+ buf.append("> message:<");
+ buf.append(message);
+ buf.append(">");
+ return buf.toString();
+ }
+}
Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1653408&r1=1653407&r2=1653408&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Tue Jan 20 23:50:55 2015
@@ -66,6 +66,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-webhcat-java-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-hbase-handler</artifactId>
<version>${project.version}</version>
Added: hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java?rev=1653408&view=auto
==============================================================================
--- hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java (added)
+++ hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java Tue Jan 20 23:50:55 2015
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This can't use TestHCatClient because it has to have control over certain conf variables when
+ * the metastore is started. Plus, we don't need a metastore running in another thread. The
+ * local one is fine.
+ */
+public class TestHCatClientNotification {
+
+ private static final Log LOG = LogFactory.getLog(TestHCatClientNotification.class.getName());
+ private static HCatClient hCatClient;
+ private int startTime;
+ private long firstEventId;
+
+ @BeforeClass
+ public static void setupClient() throws Exception {
+ HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
+ DbNotificationListener.class.getName());
+ hCatClient = HCatClient.create(conf);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ long now = System.currentTimeMillis() / 1000;
+ startTime = 0;
+ if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge");
+ else startTime = (int)now;
+ firstEventId = hCatClient.getCurrentNotificationEventId();
+ }
+
+ @Test
+ public void createDatabase() throws Exception {
+ hCatClient.createDatabase(HCatCreateDBDesc.create("myhcatdb").build());
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, events.size());
+
+ HCatNotificationEvent event = events.get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType());
+ assertNull(event.getDbName());
+ assertNull(event.getTableName());
+ assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," +
+ "\"servicePrincipal\":\"\",\"db\":\"myhcatdb\",\"timestamp\":[0-9]+}"));
+ }
+
+ @Test
+ public void dropDatabase() throws Exception {
+ String dbname = "hcatdropdb";
+ hCatClient.createDatabase(HCatCreateDBDesc.create(dbname).build());
+ hCatClient.dropDatabase(dbname, false, HCatClient.DropDBMode.RESTRICT);
+
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, events.size());
+
+ HCatNotificationEvent event = events.get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
+ assertEquals(dbname, event.getDbName());
+ assertNull(event.getTableName());
+ assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," +
+ "\"servicePrincipal\":\"\",\"db\":\"hcatdropdb\",\"timestamp\":[0-9]+}"));
+ }
+
+ @Test
+ public void createTable() throws Exception {
+ String dbName = "default";
+ String tableName = "hcatcreatetable";
+ HCatTable table = new HCatTable(dbName, tableName);
+ table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+ hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, events.size());
+
+ HCatNotificationEvent event = events.get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ assertNull(event.getTableName());
+ assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," +
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}"));
+ }
+
+ // TODO - Currently no way to test alter table, as this interface doesn't support alter table
+
+ @Test
+ public void dropTable() throws Exception {
+ String dbName = "default";
+ String tableName = "hcatdroptable";
+ HCatTable table = new HCatTable(dbName, tableName);
+ table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+ hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+ hCatClient.dropTable(dbName, tableName, false);
+
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, events.size());
+
+ HCatNotificationEvent event = events.get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
+ assertEquals(dbName, event.getDbName());
+ assertEquals(tableName, event.getTableName());
+ assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," +
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+ "\"hcatdroptable\",\"timestamp\":[0-9]+}"));
+ }
+
+ @Test
+ public void addPartition() throws Exception {
+ String dbName = "default";
+ String tableName = "hcataddparttable";
+ String partColName = "pc";
+ HCatTable table = new HCatTable(dbName, tableName);
+ table.partCol(new HCatFieldSchema(partColName, TypeInfoFactory.stringTypeInfo, ""));
+ table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+ hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+ String partName = "testpart";
+ Map<String, String> partSpec = new HashMap<String, String>(1);
+ partSpec.put(partColName, partName);
+ hCatClient.addPartition(
+ HCatAddPartitionDesc.create(
+ new HCatPartition(table, partSpec, null)
+ ).build()
+ );
+
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(2, events.size());
+
+ HCatNotificationEvent event = events.get(1);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertEquals(tableName, event.getTableName());
+ assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," +
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+ "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}"));
+ }
+
+ // TODO - currently no way to test alter partition, as HCatClient doesn't support it.
+ @Test
+ public void dropPartition() throws Exception {
+ String dbName = "default";
+ String tableName = "hcatdropparttable";
+ String partColName = "pc";
+ HCatTable table = new HCatTable(dbName, tableName);
+ table.partCol(new HCatFieldSchema(partColName, TypeInfoFactory.stringTypeInfo, ""));
+ table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+ hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+ String partName = "testpart";
+ Map<String, String> partSpec = new HashMap<String, String>(1);
+ partSpec.put(partColName, partName);
+ hCatClient.addPartition(
+ HCatAddPartitionDesc.create(
+ new HCatPartition(table, partSpec, null)
+ ).build()
+ );
+ hCatClient.dropPartitions(dbName, tableName, partSpec, false);
+
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(3, events.size());
+
+ HCatNotificationEvent event = events.get(2);
+ assertEquals(firstEventId + 3, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType());
+ assertEquals("default", event.getDbName());
+ assertEquals(tableName, event.getTableName());
+ assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," +
+ "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+ "\"hcatdropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}"));
+ }
+
+ @Test
+ public void getOnlyMaxEvents() throws Exception {
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb1").build());
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb2").build());
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb3").build());
+
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 2, null);
+ assertEquals(2, events.size());
+ assertEquals(firstEventId + 1, events.get(0).getEventId());
+ assertEquals(firstEventId + 2, events.get(1).getEventId());
+ }
+
+ @Test
+ public void filter() throws Exception {
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatf1").build());
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatf2").build());
+ hCatClient.dropDatabase("hcatf2", false, HCatClient.DropDBMode.RESTRICT);
+
+ IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ return event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ }
+ };
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, filter);
+ assertEquals(1, events.size());
+ assertEquals(firstEventId + 3, events.get(0).getEventId());
+ }
+
+ @Test
+ public void filterWithMax() throws Exception {
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatm1").build());
+ hCatClient.createDatabase(HCatCreateDBDesc.create("hcatm2").build());
+ hCatClient.dropDatabase("hcatm2", false, HCatClient.DropDBMode.RESTRICT);
+
+ IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ return event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT);
+ }
+ };
+ List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 1, filter);
+ assertEquals(1, events.size());
+ assertEquals(firstEventId + 1, events.get(0).getEventId());
+ }
+}