You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2015/01/21 00:50:55 UTC

svn commit: r1653408 - in /hive/trunk: hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/ itests/hcatalog-unit/ itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/

Author: khorgath
Date: Tue Jan 20 23:50:55 2015
New Revision: 1653408

URL: http://svn.apache.org/r1653408
Log:
HIVE-9184 : Modify HCatClient to support new notification methods in HiveMetaStoreClient (Alan Gates via Sushanth Sowmyan)

Added:
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java
    hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/
    hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
Modified:
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
    hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
    hive/trunk/itests/hcatalog-unit/pom.xml

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java?rev=1653408&r1=1653407&r2=1653408&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClient.java Tue Jan 20 23:50:55 2015
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hive.hcatalog.common.HCatException;
@@ -467,6 +468,31 @@ public abstract class HCatClient {
   public abstract String getMessageBusTopicName(String dbName, String tableName) throws HCatException;
 
   /**
+   * Get a list of notifications
+   * @param lastEventId The last event id that was consumed by this reader.  The returned
+   *                    notifications will start at the next eventId available this eventId that
+   *                    matches the filter.
+   * @param maxEvents Maximum number of events to return.  If < 1, then all available events will
+   *                  be returned.
+   * @param filter Filter to determine if message should be accepted.  If null, then all
+   *               available events up to maxEvents will be returned.
+   * @return list of notifications, sorted by eventId.  It is guaranteed that the events are in
+   * the order that the operations were done on the database.
+   * @throws HCatException
+   */
+  public abstract List<HCatNotificationEvent> getNextNotification(long lastEventId,
+                                                                  int maxEvents,
+                                                                  IMetaStoreClient.NotificationFilter filter)
+      throws HCatException;
+
+  /**
+   * Get the most recently used notification id.
+   * @return
+   * @throws HCatException
+   */
+  public abstract long getCurrentNotificationEventId() throws HCatException;
+
+  /**
    * Close the hcatalog client.
    *
    * @throws HCatException

Modified: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java?rev=1653408&r1=1653407&r2=1653408&view=diff
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java (original)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatClientHMSImpl.java Tue Jan 20 23:50:55 2015
@@ -23,20 +23,25 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PartitionEventType;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -825,7 +830,8 @@ public class HCatClientHMSImpl extends H
   @Override
   public String getMessageBusTopicName(String dbName, String tableName) throws HCatException {
     try {
-      return hmsClient.getTable(dbName, tableName).getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+      return hmsClient.getTable(dbName, tableName).getParameters().get(
+          HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
     }
     catch (MetaException e) {
       throw new HCatException("MetaException while retrieving JMS Topic name.", e);
@@ -836,7 +842,36 @@ public class HCatClientHMSImpl extends H
           "TException while retrieving JMS Topic name.", e);
     }
   }
-  
+
+  @Override
+  public List<HCatNotificationEvent> getNextNotification(long lastEventId, int maxEvents,
+                                                         IMetaStoreClient.NotificationFilter filter)
+      throws HCatException {
+    try {
+      List<HCatNotificationEvent> events = new ArrayList<HCatNotificationEvent>();
+      NotificationEventResponse rsp = hmsClient.getNextNotification(lastEventId, maxEvents, filter);
+      if (rsp != null && rsp.getEvents() != null) {
+        for (NotificationEvent event : rsp.getEvents()) {
+          events.add(new HCatNotificationEvent(event));
+        }
+      }
+      return events;
+    } catch (TException e) {
+      throw new ConnectionFailureException("TException while getting notifications", e);
+    }
+  }
+
+  @Override
+  public long getCurrentNotificationEventId() throws HCatException {
+    try {
+      CurrentNotificationEventId id = hmsClient.getCurrentNotificationEventId();
+      return id.getEventId();
+    } catch (TException e) {
+      throw new ConnectionFailureException("TException while getting current notification event " +
+          "id " , e);
+    }
+  }
+
   @Override
   public String serializeTable(HCatTable hcatTable) throws HCatException {
     return MetadataSerializer.get().serializeTable(hcatTable);
@@ -905,8 +940,10 @@ public class HCatClientHMSImpl extends H
 
   @Override
   public HCatPartitionSpec deserializePartitionSpec(List<String> hcatPartitionSpecStrings) throws HCatException {
-    HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get().deserializePartitionSpec(hcatPartitionSpecStrings);
-    hcatPartitionSpec.hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName()));
+    HCatPartitionSpec hcatPartitionSpec = MetadataSerializer.get()
+        .deserializePartitionSpec(hcatPartitionSpecStrings);
+    hcatPartitionSpec
+        .hcatTable(getTable(hcatPartitionSpec.getDbName(), hcatPartitionSpec.getTableName()));
     return hcatPartitionSpec;
   }
 }

Added: hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java?rev=1653408&view=auto
==============================================================================
--- hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java (added)
+++ hive/trunk/hcatalog/webhcat/java-client/src/main/java/org/apache/hive/hcatalog/api/HCatNotificationEvent.java Tue Jan 20 23:50:55 2015
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+
+/**
+ * A wrapper class for {@link org.apache.hadoop.hive.metastore.api.NotificationEvent},
+ * so that if that class changes we can still keep this one constant for backward compatibility
+ */
+public class HCatNotificationEvent {
+  private long eventId;
+  private int eventTime;
+  private String eventType;
+  private String dbName;
+  private String tableName;
+  private String message;
+
+  HCatNotificationEvent(NotificationEvent event) {
+    eventId = event.getEventId();
+    eventTime = event.getEventTime();
+    eventType = event.getEventType();
+    dbName = event.getDbName();
+    tableName = event.getTableName();
+    message = event.getMessage();
+  }
+
+  public long getEventId() {
+    return eventId;
+  }
+
+  public int getEventTime() {
+    return eventTime;
+  }
+
+  public String getEventType() {
+    return eventType;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append("eventId:");
+    buf.append(eventId);
+    buf.append(" eventTime:");
+    buf.append(eventTime);
+    buf.append(" eventType:<");
+    buf.append(eventType);
+    buf.append("> dbName:<");
+    buf.append(dbName);
+    buf.append("> tableName:<");
+    buf.append(tableName);
+    buf.append("> message:<");
+    buf.append(message);
+    buf.append(">");
+    return buf.toString();
+  }
+}

Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1653408&r1=1653407&r2=1653408&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Tue Jan 20 23:50:55 2015
@@ -66,6 +66,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-webhcat-java-client</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hive</groupId>
       <artifactId>hive-hbase-handler</artifactId>
       <version>${project.version}</version>

Added: hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java?rev=1653408&view=auto
==============================================================================
--- hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java (added)
+++ hive/trunk/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java Tue Jan 20 23:50:55 2015
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hive.hcatalog.api;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNull;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hive.hcatalog.listener.DbNotificationListener;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This can't use TestHCatClient because it has to have control over certain conf variables when
+ * the metastore is started.  Plus, we don't need a metastore running in another thread.  The
+ * local one is fine.
+ */
+public class TestHCatClientNotification {
+
+  private static final Log LOG = LogFactory.getLog(TestHCatClientNotification.class.getName());
+  private static HCatClient hCatClient;
+  private int startTime;
+  private long firstEventId;
+
+  @BeforeClass
+  public static void setupClient() throws Exception {
+    HiveConf conf = new HiveConf(); conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
+        DbNotificationListener.class.getName());
+    hCatClient = HCatClient.create(conf);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    long now = System.currentTimeMillis() / 1000;
+    startTime = 0;
+    if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge");
+    else startTime = (int)now;
+    firstEventId = hCatClient.getCurrentNotificationEventId();
+  }
+
+  @Test
+  public void createDatabase() throws Exception {
+    hCatClient.createDatabase(HCatCreateDBDesc.create("myhcatdb").build());
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, events.size());
+
+    HCatNotificationEvent event = events.get(0);
+    assertEquals(firstEventId + 1, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType());
+    assertNull(event.getDbName());
+    assertNull(event.getTableName());
+    assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," +
+        "\"servicePrincipal\":\"\",\"db\":\"myhcatdb\",\"timestamp\":[0-9]+}"));
+  }
+
+  @Test
+  public void dropDatabase() throws Exception {
+    String dbname = "hcatdropdb";
+    hCatClient.createDatabase(HCatCreateDBDesc.create(dbname).build());
+    hCatClient.dropDatabase(dbname, false, HCatClient.DropDBMode.RESTRICT);
+
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, events.size());
+
+    HCatNotificationEvent event = events.get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
+    assertEquals(dbname, event.getDbName());
+    assertNull(event.getTableName());
+    assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," +
+        "\"servicePrincipal\":\"\",\"db\":\"hcatdropdb\",\"timestamp\":[0-9]+}"));
+  }
+
+  @Test
+  public void createTable() throws Exception {
+    String dbName = "default";
+    String tableName = "hcatcreatetable";
+    HCatTable table = new HCatTable(dbName, tableName);
+    table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+    hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, events.size());
+
+    HCatNotificationEvent event = events.get(0);
+    assertEquals(firstEventId + 1, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+    assertEquals(dbName, event.getDbName());
+    assertNull(event.getTableName());
+    assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}"));
+  }
+
+  // TODO - Currently no way to test alter table, as this interface doesn't support alter table
+
+  @Test
+  public void dropTable() throws Exception {
+    String dbName = "default";
+    String tableName = "hcatdroptable";
+    HCatTable table = new HCatTable(dbName, tableName);
+    table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+    hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+    hCatClient.dropTable(dbName, tableName, false);
+
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, events.size());
+
+    HCatNotificationEvent event = events.get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
+    assertEquals(dbName, event.getDbName());
+    assertEquals(tableName, event.getTableName());
+    assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+        "\"hcatdroptable\",\"timestamp\":[0-9]+}"));
+  }
+
+  @Test
+  public void addPartition() throws Exception {
+    String dbName = "default";
+    String tableName = "hcataddparttable";
+    String partColName = "pc";
+    HCatTable table = new HCatTable(dbName, tableName);
+    table.partCol(new HCatFieldSchema(partColName, TypeInfoFactory.stringTypeInfo, ""));
+    table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+    hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+    String partName = "testpart";
+    Map<String, String> partSpec = new HashMap<String, String>(1);
+    partSpec.put(partColName, partName);
+    hCatClient.addPartition(
+        HCatAddPartitionDesc.create(
+            new HCatPartition(table, partSpec, null)
+        ).build()
+    );
+
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(2, events.size());
+
+    HCatNotificationEvent event = events.get(1);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertEquals(tableName, event.getTableName());
+    assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+        "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}"));
+  }
+
+  // TODO - currently no way to test alter partition, as HCatClient doesn't support it.
+  @Test
+  public void dropPartition() throws Exception {
+    String dbName = "default";
+    String tableName = "hcatdropparttable";
+    String partColName = "pc";
+    HCatTable table = new HCatTable(dbName, tableName);
+    table.partCol(new HCatFieldSchema(partColName, TypeInfoFactory.stringTypeInfo, ""));
+    table.cols(Arrays.asList(new HCatFieldSchema("onecol", TypeInfoFactory.stringTypeInfo, "")));
+    hCatClient.createTable(HCatCreateTableDesc.create(table).build());
+    String partName = "testpart";
+    Map<String, String> partSpec = new HashMap<String, String>(1);
+    partSpec.put(partColName, partName);
+    hCatClient.addPartition(
+        HCatAddPartitionDesc.create(
+            new HCatPartition(table, partSpec, null)
+        ).build()
+    );
+    hCatClient.dropPartitions(dbName, tableName, partSpec, false);
+
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(3, events.size());
+
+    HCatNotificationEvent event = events.get(2);
+    assertEquals(firstEventId + 3, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType());
+    assertEquals("default", event.getDbName());
+    assertEquals(tableName, event.getTableName());
+    assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," +
+        "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
+        "\"hcatdropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}"));
+  }
+
+  @Test
+  public void getOnlyMaxEvents() throws Exception {
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb1").build());
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb2").build());
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatdb3").build());
+
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 2, null);
+    assertEquals(2, events.size());
+    assertEquals(firstEventId + 1, events.get(0).getEventId());
+    assertEquals(firstEventId + 2, events.get(1).getEventId());
+  }
+
+  @Test
+  public void filter() throws Exception {
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatf1").build());
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatf2").build());
+    hCatClient.dropDatabase("hcatf2", false, HCatClient.DropDBMode.RESTRICT);
+
+    IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        return event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT);
+      }
+    };
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, filter);
+    assertEquals(1, events.size());
+    assertEquals(firstEventId + 3, events.get(0).getEventId());
+  }
+
+  @Test
+  public void filterWithMax() throws Exception {
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatm1").build());
+    hCatClient.createDatabase(HCatCreateDBDesc.create("hcatm2").build());
+    hCatClient.dropDatabase("hcatm2", false, HCatClient.DropDBMode.RESTRICT);
+
+    IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
+      @Override
+      public boolean accept(NotificationEvent event) {
+        return event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT);
+      }
+    };
+    List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 1, filter);
+    assertEquals(1, events.size());
+    assertEquals(firstEventId + 1, events.get(0).getEventId());
+  }
+}