You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/10/23 12:27:39 UTC

hive git commit: HIVE-20542: Incremental REPL DUMP progress information log message is incorrect (Ashutosh Bapat, reviewed by Sankar Hariappan)

Repository: hive
Updated Branches:
  refs/heads/master 7765e90aa -> 0d4d03fd1


HIVE-20542: Incremental REPL DUMP progress information log message is incorrect (Ashutosh Bapat, reviewed by Sankar Hariappan)

Signed-off-by: Sankar Hariappan <sa...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0d4d03fd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0d4d03fd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0d4d03fd

Branch: refs/heads/master
Commit: 0d4d03fd1daeb3b75182b73f7b40de7a3b7d48ea
Parents: 7765e90
Author: Ashutosh Bapat <ab...@hortonworks.com>
Authored: Tue Oct 23 17:56:47 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Oct 23 17:56:47 2018 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  75 ++++++-
 .../listener/TestDbNotificationListener.java    |  59 ++++++
 .../TestReplicationScenariosAcidTables.java     |   6 +-
 .../hadoop/hive/ql/parse/WarehouseInstance.java |  17 ++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   9 +-
 .../hive/ql/metadata/events/EventUtils.java     |  16 +-
 .../api/NotificationEventsCountRequest.java     | 206 ++++++++++++++++++-
 .../src/gen/thrift/gen-php/metastore/Types.php  |  46 +++++
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  28 ++-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   6 +-
 .../src/main/thrift/hive_metastore.thrift       |   4 +-
 .../hadoop/hive/metastore/ObjectStore.java      |  58 +++++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |   2 +-
 13 files changed, 503 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index c23aab2..fe101d3 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -939,19 +940,71 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       long nextNLId = getNextNLId(stmt, sqlGenerator,
               "org.apache.hadoop.hive.metastore.model.MNotificationLog");
 
-      String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," +
-              quoteString(" ") + ",?, ?)";
+      String insertVal;
+      String columns;
+      List<String> params = new ArrayList<String>();
+
+      // Construct the values string, parameters and column string step by step simultaneously so
+      // that the positions of columns and of their corresponding values do not go out of sync.
+
+      // Notification log id
+      columns = "\"NL_ID\"";
+      insertVal = "" + nextNLId;
+
+      // Event id
+      columns = columns + ", \"EVENT_ID\"";
+      insertVal = insertVal + "," + nextEventId;
+
+      // Event time
+      columns = columns + ", \"EVENT_TIME\"";
+      insertVal = insertVal + "," + now();
+
+      // Event type
+      columns = columns + ", \"EVENT_TYPE\"";
+      insertVal = insertVal + ", ?";
+      params.add(event.getEventType());
+
+      // Message
+      columns = columns + ", \"MESSAGE\"";
+      insertVal = insertVal + ", ?";
+      params.add(event.getMessage());
+
+      // Message format
+      columns = columns + ", \"MESSAGE_FORMAT\"";
+      insertVal = insertVal + ", ?";
+      params.add(event.getMessageFormat());
+
+      // Database name, optional
+      String dbName = event.getDbName();
+      if (dbName != null) {
+        assert dbName.equals(dbName.toLowerCase());
+        columns = columns + ", \"DB_NAME\"";
+        insertVal = insertVal + ", ?";
+        params.add(dbName);
+      }
 
-      s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
-              " \"EVENT_TYPE\", \"DB_NAME\", " +
-              " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal;
-      List<String> params = Arrays.asList(
-              event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat());
-      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      // Table name, optional
+      String tableName = event.getTableName();
+      if (tableName != null) {
+        assert tableName.equals(tableName.toLowerCase());
+        columns = columns + ", \"TBL_NAME\"";
+        insertVal = insertVal + ", ?";
+        params.add(tableName);
+      }
 
-      LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
-              quoteString(event.getEventType()), quoteString(event.getDbName()),
-              quoteString(event.getMessage()), quoteString(event.getMessageFormat()));
+      // Catalog name, optional
+      String catName = event.getCatName();
+      if (catName != null) {
+        assert catName.equals(catName.toLowerCase());
+        columns = columns + ", \"CAT_NAME\"";
+        insertVal = insertVal + ", ?";
+        params.add(catName);
+      }
+
+      s = "insert into \"NOTIFICATION_LOG\" (" + columns + ") VALUES (" + insertVal + ")";
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute insert <" + s + "> with parameters (" +
+              String.join(", ", params) + ")");
       pst.execute();
 
       // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index dc555a4..3e404df 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
@@ -300,6 +301,21 @@ public class TestDbNotificationListener {
     MockMetaStoreEventListener.clearEvents();
   }
 
+  // Test if the number of events between the given event ids and with the given database name are
+  // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0.
+  private void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit,
+                               long expectedCount) throws Exception {
+    NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName);
+
+    if (toEventId != null) {
+      rqst.setToEventId(toEventId);
+    }
+    if (limit != null) {
+      rqst.setLimit(limit);
+    }
+
+    assertEquals(expectedCount, msClient.getNotificationEventsCount(rqst).getEventsCount());
+  }
 
   @Test
   public void createDatabase() throws Exception {
@@ -341,6 +357,10 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
+
+    // There's only one event corresponding to CREATE DATABASE
+    testEventCounts(dbName, firstEventId, null, null, 1);
+    testEventCounts(dbName2, firstEventId, null, null, 0);
   }
 
   @Test
@@ -358,6 +378,7 @@ public class TestDbNotificationListener {
 
     // Two events: one for create db and other for drop db
     assertEquals(2, rsp.getEventsSize());
+    testEventCounts(dbName, firstEventId, null, null, 2);
 
     // Read event from notification
     NotificationEvent event = rsp.getEvents().get(1);
@@ -388,6 +409,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
+    testEventCounts(dbName2, firstEventId, null, null, 1);
   }
 
   @Test
@@ -443,6 +465,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 1);
   }
 
   @Test
@@ -501,6 +524,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 2);
   }
 
   @Test
@@ -567,6 +591,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 3);
   }
 
   @Test
@@ -636,6 +661,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(2, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 2);
   }
 
   @Test
@@ -704,6 +730,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 3);
   }
 
   @Test
@@ -778,6 +805,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(4, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 4);
   }
 
   @Test
@@ -873,6 +901,7 @@ public class TestDbNotificationListener {
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3);
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+    testEventCounts(dbName, firstEventId, null, null, 5);
   }
 
   @Test
@@ -931,6 +960,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(1, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 1);
   }
 
   @Test
@@ -985,6 +1015,7 @@ public class TestDbNotificationListener {
     }
     rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(3, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 3);
   }
 
   @Test
@@ -1040,6 +1071,7 @@ public class TestDbNotificationListener {
     // Verify the eventID was passed to the non-transactional listener
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2);
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+    testEventCounts(defaultDbName, firstEventId, null, null, 2);
   }
 
   @Test
@@ -1106,6 +1138,7 @@ public class TestDbNotificationListener {
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
     MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+    testEventCounts(defaultDbName, firstEventId, null, null, 3);
   }
 
 
@@ -1200,10 +1233,12 @@ public class TestDbNotificationListener {
     event = rsp.getEvents().get(5);
     assertEquals(firstEventId + 6, event.getEventId());
     assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
+    testEventCounts(defaultDbName, firstEventId, null, null, 6);
   }
 
   @Test
   public void sqlCTAS() throws Exception {
+    String defaultDbName = "default";
     String sourceTblName = "sqlctasins1";
     String targetTblName = "sqlctasins2";
     // Event 1
@@ -1229,10 +1264,12 @@ public class TestDbNotificationListener {
     event = rsp.getEvents().get(4);
     assertEquals(firstEventId + 5, event.getEventId());
     assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+    testEventCounts(defaultDbName, firstEventId, null, null, 6);
   }
 
   @Test
   public void sqlTempTable() throws Exception {
+    String defaultDbName = "default";
     String tempTblName = "sqltemptbl";
     driver.run("create temporary table " + tempTblName + "  (c int)");
     driver.run("insert into table " + tempTblName + " values (1)");
@@ -1240,6 +1277,7 @@ public class TestDbNotificationListener {
     // Get notifications from metastore
     NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
     assertEquals(0, rsp.getEventsSize());
+    testEventCounts(defaultDbName, firstEventId, null, null, 0);
   }
 
   @Test
@@ -1263,6 +1301,7 @@ public class TestDbNotificationListener {
 
   @Test
   public void sqlInsertPartition() throws Exception {
+    String defaultDbName = "default";
     String tblName = "sqlinsptn";
     // Event 1
     driver.run("create table " + tblName + " (c int) partitioned by (ds string)");
@@ -1274,6 +1313,13 @@ public class TestDbNotificationListener {
     driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
     // Event 9, 10
     driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
+
+    testEventCounts(defaultDbName, firstEventId, null, null, 10);
+    // Test a limit higher than available events
+    testEventCounts(defaultDbName, firstEventId, null, 100, 10);
+    // Test toEventId lower than current eventId
+    testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 5, null, 5);
+
     // Event 10, 11, 12
     driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
     // Event 12, 13, 14
@@ -1340,6 +1386,9 @@ public class TestDbNotificationListener {
     assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
     assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
 
+    // Test fromEventId different from the very first
+    testEventCounts(defaultDbName, event.getEventId(), null, null, 3);
+
     event = rsp.getEvents().get(21);
     assertEquals(firstEventId + 22, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
@@ -1355,6 +1404,16 @@ public class TestDbNotificationListener {
     assertEquals(firstEventId + 24, event.getEventId());
     assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
     assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
+    testEventCounts(defaultDbName, firstEventId, null, null, 24);
+
+    // Test a limit within the available events
+    testEventCounts(defaultDbName, firstEventId, null, 10, 10);
+    // Test toEventId greater than current eventId
+    testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 24);
+    // Test toEventId greater than current eventId with some limit within available events
+    testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 10, 10);
+    // Test toEventId greater than current eventId with some limit beyond available events
+    testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 24);
   }
 
   private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 4ceb9fa..af65d6a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -482,7 +482,6 @@ public class TestReplicationScenariosAcidTables {
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
-    // create table will start and coomit the transaction
     primary.run("use " + primaryDbName)
            .run("CREATE TABLE " + tableName +
             " (key int, value int) PARTITIONED BY (load_date date) " +
@@ -495,6 +494,11 @@ public class TestReplicationScenariosAcidTables {
 
     WarehouseInstance.Tuple incrementalDump =
             primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+
+    long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId);
+    primary.testEventCounts(primaryDbName, lastReplId, null, null, 20);
+
+    // Test load
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId);

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index aae7bd7..7900779 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
 import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
@@ -416,6 +417,22 @@ public class WarehouseInstance implements Closeable {
     return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
   }
 
+  // Test if the number of events between the given event ids and with the given database name are
+  // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0.
+  public void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit,
+                               long expectedCount) throws Exception {
+    NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName);
+
+    if (toEventId != null) {
+      rqst.setToEventId(toEventId);
+    }
+    if (limit != null) {
+      rqst.setLimit(limit);
+    }
+
+    assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount());
+  }
+
   @Override
   public void close() throws IOException {
     if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index c75bde5..28d61f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -163,11 +163,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
 
     lastReplId = work.eventTo;
+
+    // Right now the only pattern allowed to be specified is *, which matches all the database
+    // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter
+    // on database name when it's *. In future, if we support more elaborate patterns, we will
+    // have to pass DatabaseAndTableFilter created above to getDbNotificationEventsCount() to get
+    // correct event count.
     String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
         ? work.dbNameOrPattern
         : "?";
     replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
-            evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName));
+            evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
+                    work.maxEventLimit()));
     replLogger.startLog();
     while (evIter.hasNext()) {
       NotificationEvent ev = evIter.next();

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
index 66abd51..f925271 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
@@ -36,7 +36,8 @@ public class EventUtils {
   public interface NotificationFetcher {
     int getBatchSize() throws IOException;
     long getCurrentNotificationEventId() throws IOException;
-    long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException;
+    long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId,
+                                      int limit) throws IOException;
     List<NotificationEvent> getNextNotificationEvents(
         long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
   }
@@ -78,10 +79,21 @@ public class EventUtils {
     }
 
     @Override
-    public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException {
+    public long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId,
+                                             int limit) throws IOException {
       try {
+        // Number of events is always bounded by limit, which when non-positive, will result
+        // in no events being counted..
+        if (limit <= 0) {
+          return 0;
+        }
+
         NotificationEventsCountRequest rqst
                 = new NotificationEventsCountRequest(fromEventId, dbName);
+        if (toEventId != null) {
+          rqst.setToEventId(toEventId);
+        }
+        rqst.setLimit(limit);
         return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount();
       } catch (TException e) {
         throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
index a4a5218..95af1a4 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField FROM_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("fromEventId", org.apache.thrift.protocol.TType.I64, (short)1);
   private static final org.apache.thrift.protocol.TField DB_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbName", org.apache.thrift.protocol.TType.STRING, (short)2);
   private static final org.apache.thrift.protocol.TField CAT_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("catName", org.apache.thrift.protocol.TType.STRING, (short)3);
+  private static final org.apache.thrift.protocol.TField TO_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("toEventId", org.apache.thrift.protocol.TType.I64, (short)4);
+  private static final org.apache.thrift.protocol.TField LIMIT_FIELD_DESC = new org.apache.thrift.protocol.TField("limit", org.apache.thrift.protocol.TType.I64, (short)5);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -51,12 +53,16 @@ import org.slf4j.LoggerFactory;
   private long fromEventId; // required
   private String dbName; // required
   private String catName; // optional
+  private long toEventId; // optional
+  private long limit; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     FROM_EVENT_ID((short)1, "fromEventId"),
     DB_NAME((short)2, "dbName"),
-    CAT_NAME((short)3, "catName");
+    CAT_NAME((short)3, "catName"),
+    TO_EVENT_ID((short)4, "toEventId"),
+    LIMIT((short)5, "limit");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -77,6 +83,10 @@ import org.slf4j.LoggerFactory;
           return DB_NAME;
         case 3: // CAT_NAME
           return CAT_NAME;
+        case 4: // TO_EVENT_ID
+          return TO_EVENT_ID;
+        case 5: // LIMIT
+          return LIMIT;
         default:
           return null;
       }
@@ -118,8 +128,10 @@ import org.slf4j.LoggerFactory;
 
   // isset id assignments
   private static final int __FROMEVENTID_ISSET_ID = 0;
+  private static final int __TOEVENTID_ISSET_ID = 1;
+  private static final int __LIMIT_ISSET_ID = 2;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.CAT_NAME};
+  private static final _Fields optionals[] = {_Fields.CAT_NAME,_Fields.TO_EVENT_ID,_Fields.LIMIT};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -129,6 +141,10 @@ import org.slf4j.LoggerFactory;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.CAT_NAME, new org.apache.thrift.meta_data.FieldMetaData("catName", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.TO_EVENT_ID, new org.apache.thrift.meta_data.FieldMetaData("toEventId", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.LIMIT, new org.apache.thrift.meta_data.FieldMetaData("limit", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEventsCountRequest.class, metaDataMap);
   }
@@ -158,6 +174,8 @@ import org.slf4j.LoggerFactory;
     if (other.isSetCatName()) {
       this.catName = other.catName;
     }
+    this.toEventId = other.toEventId;
+    this.limit = other.limit;
   }
 
   public NotificationEventsCountRequest deepCopy() {
@@ -170,6 +188,10 @@ import org.slf4j.LoggerFactory;
     this.fromEventId = 0;
     this.dbName = null;
     this.catName = null;
+    setToEventIdIsSet(false);
+    this.toEventId = 0;
+    setLimitIsSet(false);
+    this.limit = 0;
   }
 
   public long getFromEventId() {
@@ -240,6 +262,50 @@ import org.slf4j.LoggerFactory;
     }
   }
 
+  public long getToEventId() {
+    return this.toEventId;
+  }
+
+  public void setToEventId(long toEventId) {
+    this.toEventId = toEventId;
+    setToEventIdIsSet(true);
+  }
+
+  public void unsetToEventId() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TOEVENTID_ISSET_ID);
+  }
+
+  /** Returns true if field toEventId is set (has been assigned a value) and false otherwise */
+  public boolean isSetToEventId() {
+    return EncodingUtils.testBit(__isset_bitfield, __TOEVENTID_ISSET_ID);
+  }
+
+  public void setToEventIdIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TOEVENTID_ISSET_ID, value);
+  }
+
+  public long getLimit() {
+    return this.limit;
+  }
+
+  public void setLimit(long limit) {
+    this.limit = limit;
+    setLimitIsSet(true);
+  }
+
+  public void unsetLimit() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __LIMIT_ISSET_ID);
+  }
+
+  /** Returns true if field limit is set (has been assigned a value) and false otherwise */
+  public boolean isSetLimit() {
+    return EncodingUtils.testBit(__isset_bitfield, __LIMIT_ISSET_ID);
+  }
+
+  public void setLimitIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __LIMIT_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case FROM_EVENT_ID:
@@ -266,6 +332,22 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
+    case TO_EVENT_ID:
+      if (value == null) {
+        unsetToEventId();
+      } else {
+        setToEventId((Long)value);
+      }
+      break;
+
+    case LIMIT:
+      if (value == null) {
+        unsetLimit();
+      } else {
+        setLimit((Long)value);
+      }
+      break;
+
     }
   }
 
@@ -280,6 +362,12 @@ import org.slf4j.LoggerFactory;
     case CAT_NAME:
       return getCatName();
 
+    case TO_EVENT_ID:
+      return getToEventId();
+
+    case LIMIT:
+      return getLimit();
+
     }
     throw new IllegalStateException();
   }
@@ -297,6 +385,10 @@ import org.slf4j.LoggerFactory;
       return isSetDbName();
     case CAT_NAME:
       return isSetCatName();
+    case TO_EVENT_ID:
+      return isSetToEventId();
+    case LIMIT:
+      return isSetLimit();
     }
     throw new IllegalStateException();
   }
@@ -341,6 +433,24 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
+    boolean this_present_toEventId = true && this.isSetToEventId();
+    boolean that_present_toEventId = true && that.isSetToEventId();
+    if (this_present_toEventId || that_present_toEventId) {
+      if (!(this_present_toEventId && that_present_toEventId))
+        return false;
+      if (this.toEventId != that.toEventId)
+        return false;
+    }
+
+    boolean this_present_limit = true && this.isSetLimit();
+    boolean that_present_limit = true && that.isSetLimit();
+    if (this_present_limit || that_present_limit) {
+      if (!(this_present_limit && that_present_limit))
+        return false;
+      if (this.limit != that.limit)
+        return false;
+    }
+
     return true;
   }
 
@@ -363,6 +473,16 @@ import org.slf4j.LoggerFactory;
     if (present_catName)
       list.add(catName);
 
+    boolean present_toEventId = true && (isSetToEventId());
+    list.add(present_toEventId);
+    if (present_toEventId)
+      list.add(toEventId);
+
+    boolean present_limit = true && (isSetLimit());
+    list.add(present_limit);
+    if (present_limit)
+      list.add(limit);
+
     return list.hashCode();
   }
 
@@ -404,6 +524,26 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetToEventId()).compareTo(other.isSetToEventId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetToEventId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.toEventId, other.toEventId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetLimit()).compareTo(other.isSetLimit());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetLimit()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.limit, other.limit);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -445,6 +585,18 @@ import org.slf4j.LoggerFactory;
       }
       first = false;
     }
+    if (isSetToEventId()) {
+      if (!first) sb.append(", ");
+      sb.append("toEventId:");
+      sb.append(this.toEventId);
+      first = false;
+    }
+    if (isSetLimit()) {
+      if (!first) sb.append(", ");
+      sb.append("limit:");
+      sb.append(this.limit);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -522,6 +674,22 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 4: // TO_EVENT_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.toEventId = iprot.readI64();
+              struct.setToEventIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 5: // LIMIT
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.limit = iprot.readI64();
+              struct.setLimitIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -550,6 +718,16 @@ import org.slf4j.LoggerFactory;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.isSetToEventId()) {
+        oprot.writeFieldBegin(TO_EVENT_ID_FIELD_DESC);
+        oprot.writeI64(struct.toEventId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.isSetLimit()) {
+        oprot.writeFieldBegin(LIMIT_FIELD_DESC);
+        oprot.writeI64(struct.limit);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -573,10 +751,22 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetCatName()) {
         optionals.set(0);
       }
-      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetToEventId()) {
+        optionals.set(1);
+      }
+      if (struct.isSetLimit()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
       if (struct.isSetCatName()) {
         oprot.writeString(struct.catName);
       }
+      if (struct.isSetToEventId()) {
+        oprot.writeI64(struct.toEventId);
+      }
+      if (struct.isSetLimit()) {
+        oprot.writeI64(struct.limit);
+      }
     }
 
     @Override
@@ -586,11 +776,19 @@ import org.slf4j.LoggerFactory;
       struct.setFromEventIdIsSet(true);
       struct.dbName = iprot.readString();
       struct.setDbNameIsSet(true);
-      BitSet incoming = iprot.readBitSet(1);
+      BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         struct.catName = iprot.readString();
         struct.setCatNameIsSet(true);
       }
+      if (incoming.get(1)) {
+        struct.toEventId = iprot.readI64();
+        struct.setToEventIdIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.limit = iprot.readI64();
+        struct.setLimitIsSet(true);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
index 775c9d9..5fd5d78 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
@@ -22619,6 +22619,14 @@ class NotificationEventsCountRequest {
    * @var string
    */
   public $catName = null;
+  /**
+   * @var int
+   */
+  public $toEventId = null;
+  /**
+   * @var int
+   */
+  public $limit = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -22635,6 +22643,14 @@ class NotificationEventsCountRequest {
           'var' => 'catName',
           'type' => TType::STRING,
           ),
+        4 => array(
+          'var' => 'toEventId',
+          'type' => TType::I64,
+          ),
+        5 => array(
+          'var' => 'limit',
+          'type' => TType::I64,
+          ),
         );
     }
     if (is_array($vals)) {
@@ -22647,6 +22663,12 @@ class NotificationEventsCountRequest {
       if (isset($vals['catName'])) {
         $this->catName = $vals['catName'];
       }
+      if (isset($vals['toEventId'])) {
+        $this->toEventId = $vals['toEventId'];
+      }
+      if (isset($vals['limit'])) {
+        $this->limit = $vals['limit'];
+      }
     }
   }
 
@@ -22690,6 +22712,20 @@ class NotificationEventsCountRequest {
             $xfer += $input->skip($ftype);
           }
           break;
+        case 4:
+          if ($ftype == TType::I64) {
+            $xfer += $input->readI64($this->toEventId);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
+        case 5:
+          if ($ftype == TType::I64) {
+            $xfer += $input->readI64($this->limit);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
         default:
           $xfer += $input->skip($ftype);
           break;
@@ -22718,6 +22754,16 @@ class NotificationEventsCountRequest {
       $xfer += $output->writeString($this->catName);
       $xfer += $output->writeFieldEnd();
     }
+    if ($this->toEventId !== null) {
+      $xfer += $output->writeFieldBegin('toEventId', TType::I64, 4);
+      $xfer += $output->writeI64($this->toEventId);
+      $xfer += $output->writeFieldEnd();
+    }
+    if ($this->limit !== null) {
+      $xfer += $output->writeFieldBegin('limit', TType::I64, 5);
+      $xfer += $output->writeI64($this->limit);
+      $xfer += $output->writeFieldEnd();
+    }
     $xfer += $output->writeFieldStop();
     $xfer += $output->writeStructEnd();
     return $xfer;

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 9d3885c..03c2a4e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15814,6 +15814,8 @@ class NotificationEventsCountRequest:
    - fromEventId
    - dbName
    - catName
+   - toEventId
+   - limit
   """
 
   thrift_spec = (
@@ -15821,12 +15823,16 @@ class NotificationEventsCountRequest:
     (1, TType.I64, 'fromEventId', None, None, ), # 1
     (2, TType.STRING, 'dbName', None, None, ), # 2
     (3, TType.STRING, 'catName', None, None, ), # 3
+    (4, TType.I64, 'toEventId', None, None, ), # 4
+    (5, TType.I64, 'limit', None, None, ), # 5
   )
 
-  def __init__(self, fromEventId=None, dbName=None, catName=None,):
+  def __init__(self, fromEventId=None, dbName=None, catName=None, toEventId=None, limit=None,):
     self.fromEventId = fromEventId
     self.dbName = dbName
     self.catName = catName
+    self.toEventId = toEventId
+    self.limit = limit
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -15852,6 +15858,16 @@ class NotificationEventsCountRequest:
           self.catName = iprot.readString()
         else:
           iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.I64:
+          self.toEventId = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.I64:
+          self.limit = iprot.readI64()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -15874,6 +15890,14 @@ class NotificationEventsCountRequest:
       oprot.writeFieldBegin('catName', TType.STRING, 3)
       oprot.writeString(self.catName)
       oprot.writeFieldEnd()
+    if self.toEventId is not None:
+      oprot.writeFieldBegin('toEventId', TType.I64, 4)
+      oprot.writeI64(self.toEventId)
+      oprot.writeFieldEnd()
+    if self.limit is not None:
+      oprot.writeFieldBegin('limit', TType.I64, 5)
+      oprot.writeI64(self.limit)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -15890,6 +15914,8 @@ class NotificationEventsCountRequest:
     value = (value * 31) ^ hash(self.fromEventId)
     value = (value * 31) ^ hash(self.dbName)
     value = (value * 31) ^ hash(self.catName)
+    value = (value * 31) ^ hash(self.toEventId)
+    value = (value * 31) ^ hash(self.limit)
     return value
 
   def __repr__(self):

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 26b89c0..2eea181 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3518,11 +3518,15 @@ class NotificationEventsCountRequest
   FROMEVENTID = 1
   DBNAME = 2
   CATNAME = 3
+  TOEVENTID = 4
+  LIMIT = 5
 
   FIELDS = {
     FROMEVENTID => {:type => ::Thrift::Types::I64, :name => 'fromEventId'},
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
-    CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true}
+    CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true},
+    TOEVENTID => {:type => ::Thrift::Types::I64, :name => 'toEventId', :optional => true},
+    LIMIT => {:type => ::Thrift::Types::I64, :name => 'limit', :optional => true}
   }
 
   def struct_fields; FIELDS; end

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index a2a6740..4b7b615 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1147,7 +1147,9 @@ struct CurrentNotificationEventId {
 struct NotificationEventsCountRequest {
     1: required i64 fromEventId,
     2: required string dbName,
-    3: optional string catName
+    3: optional string catName,
+    4: optional i64 toEventId,
+    5: optional i64 limit
 }
 
 struct NotificationEventsCountResponse {

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index ddd64e7..9c15804 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10260,14 +10260,60 @@ public class ObjectStore implements RawStore, Configurable {
       long fromEventId = rqst.getFromEventId();
       String inputDbName = rqst.getDbName();
       String catName = rqst.isSetCatName() ? rqst.getCatName() : getDefaultCatalog(conf);
-      String queryStr = "select count(eventId) from " + MNotificationLog.class.getName()
-                + " where eventId > fromEventId && dbName == inputDbName && catalogName == catName";
+      long toEventId;
+      String paramSpecs;
+      List<Object> paramVals = new ArrayList<Object>();
+
+      // We store a catalog name in lower case in metastore and also use the same way everywhere in
+      // hive.
+      assert catName.equals(catName.toLowerCase());
+
+      // Build the query to count events, part by part
+      String queryStr = "select count(eventId) from " + MNotificationLog.class.getName();
+      // count fromEventId onwards events
+      queryStr = queryStr + " where eventId > fromEventId";
+      paramSpecs = "java.lang.Long fromEventId";
+      paramVals.add(Long.valueOf(fromEventId));
+
+      // Input database name can be a database name or a *. In the first case we add a filter
+      // condition on dbName column, but not in the second case, since a * means all the
+      // databases. In case we support more elaborate database name patterns in future, we will
+      // have to apply a method similar to getNextNotification() method of MetaStoreClient.
+      if (!inputDbName.equals("*")) {
+        // dbName could be NULL in case of transaction related events, which also need to be
+        // counted.
+        queryStr = queryStr + " && (dbName == inputDbName || dbName == null)";
+        paramSpecs = paramSpecs + ", java.lang.String inputDbName";
+        // We store a database name in lower case in metastore.
+        paramVals.add(inputDbName.toLowerCase());
+      }
+
+      // catName could be NULL in case of transaction related events, which also need to be
+      // counted.
+      queryStr = queryStr + " && (catalogName == catName || catalogName == null)";
+      paramSpecs = paramSpecs +", java.lang.String catName";
+      paramVals.add(catName);
+
+      // count events upto toEventId if specified
+      if (rqst.isSetToEventId()) {
+        toEventId = rqst.getToEventId();
+        queryStr = queryStr + " && eventId <= toEventId";
+        paramSpecs = paramSpecs + ", java.lang.Long toEventId";
+        paramVals.add(Long.valueOf(toEventId));
+      }
+
       query = pm.newQuery(queryStr);
-      query.declareParameters("java.lang.Long fromEventId, java.lang.String inputDbName," +
-          " java.lang.String catName");
-      result = (Long) query.execute(fromEventId, inputDbName, catName);
+      query.declareParameters(paramSpecs);
+      result = (Long) query.executeWithArray(paramVals.toArray());
       commited = commitTransaction();
-      return new NotificationEventsCountResponse(result.longValue());
+
+      // Cap the event count by limit if specified.
+      long  eventCount = result.longValue();
+      if (rqst.isSetLimit() && eventCount > rqst.getLimit()) {
+        eventCount = rqst.getLimit();
+      }
+
+      return new NotificationEventsCountResponse(eventCount);
     } finally {
       rollbackAndCleanup(commited, query);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 351fafd..0bb739f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1652,7 +1652,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (transactionalListeners != null) {
           MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
                   EventMessage.EventType.ALLOC_WRITE_ID,
-                  new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(), null),
+                  new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null),
                   dbConn, sqlGenerator);
         }