You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/10/23 12:27:39 UTC
hive git commit: HIVE-20542: Incremental REPL DUMP progress
information log message is incorrect (Ashutosh Bapat,
reviewed by Sankar Hariappan)
Repository: hive
Updated Branches:
refs/heads/master 7765e90aa -> 0d4d03fd1
HIVE-20542: Incremental REPL DUMP progress information log message is incorrect (Ashutosh Bapat, reviewed by Sankar Hariappan)
Signed-off-by: Sankar Hariappan <sa...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0d4d03fd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0d4d03fd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0d4d03fd
Branch: refs/heads/master
Commit: 0d4d03fd1daeb3b75182b73f7b40de7a3b7d48ea
Parents: 7765e90
Author: Ashutosh Bapat <ab...@hortonworks.com>
Authored: Tue Oct 23 17:56:47 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Oct 23 17:56:47 2018 +0530
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 75 ++++++-
.../listener/TestDbNotificationListener.java | 59 ++++++
.../TestReplicationScenariosAcidTables.java | 6 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 17 ++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 9 +-
.../hive/ql/metadata/events/EventUtils.java | 16 +-
.../api/NotificationEventsCountRequest.java | 206 ++++++++++++++++++-
.../src/gen/thrift/gen-php/metastore/Types.php | 46 +++++
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 28 ++-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 6 +-
.../src/main/thrift/hive_metastore.thrift | 4 +-
.../hadoop/hive/metastore/ObjectStore.java | 58 +++++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 2 +-
13 files changed, 503 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index c23aab2..fe101d3 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -939,19 +940,71 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
long nextNLId = getNextNLId(stmt, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MNotificationLog");
- String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," +
- quoteString(" ") + ",?, ?)";
+ String insertVal;
+ String columns;
+ List<String> params = new ArrayList<String>();
+
+ // Construct the values string, parameters and column string step by step simultaneously so
+ // that the positions of columns and of their corresponding values do not go out of sync.
+
+ // Notification log id
+ columns = "\"NL_ID\"";
+ insertVal = "" + nextNLId;
+
+ // Event id
+ columns = columns + ", \"EVENT_ID\"";
+ insertVal = insertVal + "," + nextEventId;
+
+ // Event time
+ columns = columns + ", \"EVENT_TIME\"";
+ insertVal = insertVal + "," + now();
+
+ // Event type
+ columns = columns + ", \"EVENT_TYPE\"";
+ insertVal = insertVal + ", ?";
+ params.add(event.getEventType());
+
+ // Message
+ columns = columns + ", \"MESSAGE\"";
+ insertVal = insertVal + ", ?";
+ params.add(event.getMessage());
+
+ // Message format
+ columns = columns + ", \"MESSAGE_FORMAT\"";
+ insertVal = insertVal + ", ?";
+ params.add(event.getMessageFormat());
+
+ // Database name, optional
+ String dbName = event.getDbName();
+ if (dbName != null) {
+ assert dbName.equals(dbName.toLowerCase());
+ columns = columns + ", \"DB_NAME\"";
+ insertVal = insertVal + ", ?";
+ params.add(dbName);
+ }
- s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
- " \"EVENT_TYPE\", \"DB_NAME\", " +
- " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal;
- List<String> params = Arrays.asList(
- event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat());
- pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ // Table name, optional
+ String tableName = event.getTableName();
+ if (tableName != null) {
+ assert tableName.equals(tableName.toLowerCase());
+ columns = columns + ", \"TBL_NAME\"";
+ insertVal = insertVal + ", ?";
+ params.add(tableName);
+ }
- LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
- quoteString(event.getEventType()), quoteString(event.getDbName()),
- quoteString(event.getMessage()), quoteString(event.getMessageFormat()));
+ // Catalog name, optional
+ String catName = event.getCatName();
+ if (catName != null) {
+ assert catName.equals(catName.toLowerCase());
+ columns = columns + ", \"CAT_NAME\"";
+ insertVal = insertVal + ", ?";
+ params.add(catName);
+ }
+
+ s = "insert into \"NOTIFICATION_LOG\" (" + columns + ") VALUES (" + insertVal + ")";
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute insert <" + s + "> with parameters (" +
+ String.join(", ", params) + ")");
pst.execute();
// Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index dc555a4..3e404df 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
@@ -300,6 +301,21 @@ public class TestDbNotificationListener {
MockMetaStoreEventListener.clearEvents();
}
+ // Test if the number of events between the given event ids and with the given database name are
+ // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0.
+ private void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit,
+ long expectedCount) throws Exception {
+ NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName);
+
+ if (toEventId != null) {
+ rqst.setToEventId(toEventId);
+ }
+ if (limit != null) {
+ rqst.setLimit(limit);
+ }
+
+ assertEquals(expectedCount, msClient.getNotificationEventsCount(rqst).getEventsCount());
+ }
@Test
public void createDatabase() throws Exception {
@@ -341,6 +357,10 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
+
+ // There's only one event corresponding to CREATE DATABASE
+ testEventCounts(dbName, firstEventId, null, null, 1);
+ testEventCounts(dbName2, firstEventId, null, null, 0);
}
@Test
@@ -358,6 +378,7 @@ public class TestDbNotificationListener {
// Two events: one for create db and other for drop db
assertEquals(2, rsp.getEventsSize());
+ testEventCounts(dbName, firstEventId, null, null, 2);
// Read event from notification
NotificationEvent event = rsp.getEvents().get(1);
@@ -388,6 +409,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
+ testEventCounts(dbName2, firstEventId, null, null, 1);
}
@Test
@@ -443,6 +465,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 1);
}
@Test
@@ -501,6 +524,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 2);
}
@Test
@@ -567,6 +591,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 3);
}
@Test
@@ -636,6 +661,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 2);
}
@Test
@@ -704,6 +730,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 3);
}
@Test
@@ -778,6 +805,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(4, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 4);
}
@Test
@@ -873,6 +901,7 @@ public class TestDbNotificationListener {
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 3);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+ testEventCounts(dbName, firstEventId, null, null, 5);
}
@Test
@@ -931,6 +960,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 1);
}
@Test
@@ -985,6 +1015,7 @@ public class TestDbNotificationListener {
}
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 3);
}
@Test
@@ -1040,6 +1071,7 @@ public class TestDbNotificationListener {
// Verify the eventID was passed to the non-transactional listener
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+ testEventCounts(defaultDbName, firstEventId, null, null, 2);
}
@Test
@@ -1106,6 +1138,7 @@ public class TestDbNotificationListener {
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.INSERT, firstEventId + 3);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.ADD_PARTITION, firstEventId + 2);
MockMetaStoreEventListener.popAndVerifyLastEventId(EventType.CREATE_TABLE, firstEventId + 1);
+ testEventCounts(defaultDbName, firstEventId, null, null, 3);
}
@@ -1200,10 +1233,12 @@ public class TestDbNotificationListener {
event = rsp.getEvents().get(5);
assertEquals(firstEventId + 6, event.getEventId());
assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
+ testEventCounts(defaultDbName, firstEventId, null, null, 6);
}
@Test
public void sqlCTAS() throws Exception {
+ String defaultDbName = "default";
String sourceTblName = "sqlctasins1";
String targetTblName = "sqlctasins2";
// Event 1
@@ -1229,10 +1264,12 @@ public class TestDbNotificationListener {
event = rsp.getEvents().get(4);
assertEquals(firstEventId + 5, event.getEventId());
assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+ testEventCounts(defaultDbName, firstEventId, null, null, 6);
}
@Test
public void sqlTempTable() throws Exception {
+ String defaultDbName = "default";
String tempTblName = "sqltemptbl";
driver.run("create temporary table " + tempTblName + " (c int)");
driver.run("insert into table " + tempTblName + " values (1)");
@@ -1240,6 +1277,7 @@ public class TestDbNotificationListener {
// Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(0, rsp.getEventsSize());
+ testEventCounts(defaultDbName, firstEventId, null, null, 0);
}
@Test
@@ -1263,6 +1301,7 @@ public class TestDbNotificationListener {
@Test
public void sqlInsertPartition() throws Exception {
+ String defaultDbName = "default";
String tblName = "sqlinsptn";
// Event 1
driver.run("create table " + tblName + " (c int) partitioned by (ds string)");
@@ -1274,6 +1313,13 @@ public class TestDbNotificationListener {
driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
// Event 9, 10
driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
+
+ testEventCounts(defaultDbName, firstEventId, null, null, 10);
+ // Test a limit higher than available events
+ testEventCounts(defaultDbName, firstEventId, null, 100, 10);
+ // Test toEventId lower than current eventId
+ testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 5, null, 5);
+
// Event 10, 11, 12
driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
// Event 12, 13, 14
@@ -1340,6 +1386,9 @@ public class TestDbNotificationListener {
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
+ // Test fromEventId different from the very first
+ testEventCounts(defaultDbName, event.getEventId(), null, null, 3);
+
event = rsp.getEvents().get(21);
assertEquals(firstEventId + 22, event.getEventId());
assertEquals(EventType.INSERT.toString(), event.getEventType());
@@ -1355,6 +1404,16 @@ public class TestDbNotificationListener {
assertEquals(firstEventId + 24, event.getEventId());
assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
+ testEventCounts(defaultDbName, firstEventId, null, null, 24);
+
+ // Test a limit within the available events
+ testEventCounts(defaultDbName, firstEventId, null, 10, 10);
+ // Test toEventId greater than current eventId
+ testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, null, 24);
+ // Test toEventId greater than current eventId with some limit within available events
+ testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 10, 10);
+ // Test toEventId greater than current eventId with some limit beyond available events
+ testEventCounts(defaultDbName, firstEventId, (long) firstEventId + 100, 50, 24);
}
private void verifyInsert(NotificationEvent event, String dbName, String tblName) throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 4ceb9fa..af65d6a 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -482,7 +482,6 @@ public class TestReplicationScenariosAcidTables {
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
- // create table will start and coomit the transaction
primary.run("use " + primaryDbName)
.run("CREATE TABLE " + tableName +
" (key int, value int) PARTITIONED BY (load_date date) " +
@@ -495,6 +494,11 @@ public class TestReplicationScenariosAcidTables {
WarehouseInstance.Tuple incrementalDump =
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+
+ long lastReplId = Long.parseLong(bootStrapDump.lastReplicationId);
+ primary.testEventCounts(primaryDbName, lastReplId, null, null, 20);
+
+ // Test load
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId);
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index aae7bd7..7900779 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
@@ -416,6 +417,22 @@ public class WarehouseInstance implements Closeable {
return new ReplicationV1CompatRule(client, hiveConf, testsToSkip);
}
+ // Test if the number of events between the given event ids and with the given database name are
+ // same as expected. toEventId = 0 is treated as unbounded. Same is the case with limit 0.
+ public void testEventCounts(String dbName, long fromEventId, Long toEventId, Integer limit,
+ long expectedCount) throws Exception {
+ NotificationEventsCountRequest rqst = new NotificationEventsCountRequest(fromEventId, dbName);
+
+ if (toEventId != null) {
+ rqst.setToEventId(toEventId);
+ }
+ if (limit != null) {
+ rqst.setLimit(limit);
+ }
+
+ assertEquals(expectedCount, client.getNotificationEventsCount(rqst).getEventsCount());
+ }
+
@Override
public void close() throws IOException {
if (miniDFSCluster != null && miniDFSCluster.isClusterUp()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index c75bde5..28d61f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -163,11 +163,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
lastReplId = work.eventTo;
+
+ // Right now the only pattern allowed to be specified is *, which matches all the database
+ // names. So passing dbname as is works since getDbNotificationEventsCount can exclude filter
+ // on database name when it's *. In future, if we support more elaborate patterns, we will
+ // have to pass DatabaseAndTableFilter created above to getDbNotificationEventsCount() to get
+ // correct event count.
String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
? work.dbNameOrPattern
: "?";
replLogger = new IncrementalDumpLogger(dbName, dumpRoot.toString(),
- evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName));
+ evFetcher.getDbNotificationEventsCount(work.eventFrom, dbName, work.eventTo,
+ work.maxEventLimit()));
replLogger.startLog();
while (evIter.hasNext()) {
NotificationEvent ev = evIter.next();
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
index 66abd51..f925271 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/events/EventUtils.java
@@ -36,7 +36,8 @@ public class EventUtils {
public interface NotificationFetcher {
int getBatchSize() throws IOException;
long getCurrentNotificationEventId() throws IOException;
- long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException;
+ long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId,
+ int limit) throws IOException;
List<NotificationEvent> getNextNotificationEvents(
long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
}
@@ -78,10 +79,21 @@ public class EventUtils {
}
@Override
- public long getDbNotificationEventsCount(long fromEventId, String dbName) throws IOException {
+ public long getDbNotificationEventsCount(long fromEventId, String dbName, Long toEventId,
+ int limit) throws IOException {
try {
+ // Number of events is always bounded by limit, which when non-positive, will result
+ // in no events being counted..
+ if (limit <= 0) {
+ return 0;
+ }
+
NotificationEventsCountRequest rqst
= new NotificationEventsCountRequest(fromEventId, dbName);
+ if (toEventId != null) {
+ rqst.setToEventId(toEventId);
+ }
+ rqst.setLimit(limit);
return hiveDb.getMSC().getNotificationEventsCount(rqst).getEventsCount();
} catch (TException e) {
throw new IOException(e);
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
index a4a5218..95af1a4 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/NotificationEventsCountRequest.java
@@ -41,6 +41,8 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField FROM_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("fromEventId", org.apache.thrift.protocol.TType.I64, (short)1);
private static final org.apache.thrift.protocol.TField DB_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("dbName", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField CAT_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("catName", org.apache.thrift.protocol.TType.STRING, (short)3);
+ private static final org.apache.thrift.protocol.TField TO_EVENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("toEventId", org.apache.thrift.protocol.TType.I64, (short)4);
+ private static final org.apache.thrift.protocol.TField LIMIT_FIELD_DESC = new org.apache.thrift.protocol.TField("limit", org.apache.thrift.protocol.TType.I64, (short)5);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -51,12 +53,16 @@ import org.slf4j.LoggerFactory;
private long fromEventId; // required
private String dbName; // required
private String catName; // optional
+ private long toEventId; // optional
+ private long limit; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
FROM_EVENT_ID((short)1, "fromEventId"),
DB_NAME((short)2, "dbName"),
- CAT_NAME((short)3, "catName");
+ CAT_NAME((short)3, "catName"),
+ TO_EVENT_ID((short)4, "toEventId"),
+ LIMIT((short)5, "limit");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -77,6 +83,10 @@ import org.slf4j.LoggerFactory;
return DB_NAME;
case 3: // CAT_NAME
return CAT_NAME;
+ case 4: // TO_EVENT_ID
+ return TO_EVENT_ID;
+ case 5: // LIMIT
+ return LIMIT;
default:
return null;
}
@@ -118,8 +128,10 @@ import org.slf4j.LoggerFactory;
// isset id assignments
private static final int __FROMEVENTID_ISSET_ID = 0;
+ private static final int __TOEVENTID_ISSET_ID = 1;
+ private static final int __LIMIT_ISSET_ID = 2;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.CAT_NAME};
+ private static final _Fields optionals[] = {_Fields.CAT_NAME,_Fields.TO_EVENT_ID,_Fields.LIMIT};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -129,6 +141,10 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.CAT_NAME, new org.apache.thrift.meta_data.FieldMetaData("catName", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TO_EVENT_ID, new org.apache.thrift.meta_data.FieldMetaData("toEventId", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.LIMIT, new org.apache.thrift.meta_data.FieldMetaData("limit", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NotificationEventsCountRequest.class, metaDataMap);
}
@@ -158,6 +174,8 @@ import org.slf4j.LoggerFactory;
if (other.isSetCatName()) {
this.catName = other.catName;
}
+ this.toEventId = other.toEventId;
+ this.limit = other.limit;
}
public NotificationEventsCountRequest deepCopy() {
@@ -170,6 +188,10 @@ import org.slf4j.LoggerFactory;
this.fromEventId = 0;
this.dbName = null;
this.catName = null;
+ setToEventIdIsSet(false);
+ this.toEventId = 0;
+ setLimitIsSet(false);
+ this.limit = 0;
}
public long getFromEventId() {
@@ -240,6 +262,50 @@ import org.slf4j.LoggerFactory;
}
}
+ public long getToEventId() {
+ return this.toEventId;
+ }
+
+ public void setToEventId(long toEventId) {
+ this.toEventId = toEventId;
+ setToEventIdIsSet(true);
+ }
+
+ public void unsetToEventId() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TOEVENTID_ISSET_ID);
+ }
+
+ /** Returns true if field toEventId is set (has been assigned a value) and false otherwise */
+ public boolean isSetToEventId() {
+ return EncodingUtils.testBit(__isset_bitfield, __TOEVENTID_ISSET_ID);
+ }
+
+ public void setToEventIdIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TOEVENTID_ISSET_ID, value);
+ }
+
+ public long getLimit() {
+ return this.limit;
+ }
+
+ public void setLimit(long limit) {
+ this.limit = limit;
+ setLimitIsSet(true);
+ }
+
+ public void unsetLimit() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __LIMIT_ISSET_ID);
+ }
+
+ /** Returns true if field limit is set (has been assigned a value) and false otherwise */
+ public boolean isSetLimit() {
+ return EncodingUtils.testBit(__isset_bitfield, __LIMIT_ISSET_ID);
+ }
+
+ public void setLimitIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __LIMIT_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case FROM_EVENT_ID:
@@ -266,6 +332,22 @@ import org.slf4j.LoggerFactory;
}
break;
+ case TO_EVENT_ID:
+ if (value == null) {
+ unsetToEventId();
+ } else {
+ setToEventId((Long)value);
+ }
+ break;
+
+ case LIMIT:
+ if (value == null) {
+ unsetLimit();
+ } else {
+ setLimit((Long)value);
+ }
+ break;
+
}
}
@@ -280,6 +362,12 @@ import org.slf4j.LoggerFactory;
case CAT_NAME:
return getCatName();
+ case TO_EVENT_ID:
+ return getToEventId();
+
+ case LIMIT:
+ return getLimit();
+
}
throw new IllegalStateException();
}
@@ -297,6 +385,10 @@ import org.slf4j.LoggerFactory;
return isSetDbName();
case CAT_NAME:
return isSetCatName();
+ case TO_EVENT_ID:
+ return isSetToEventId();
+ case LIMIT:
+ return isSetLimit();
}
throw new IllegalStateException();
}
@@ -341,6 +433,24 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_toEventId = true && this.isSetToEventId();
+ boolean that_present_toEventId = true && that.isSetToEventId();
+ if (this_present_toEventId || that_present_toEventId) {
+ if (!(this_present_toEventId && that_present_toEventId))
+ return false;
+ if (this.toEventId != that.toEventId)
+ return false;
+ }
+
+ boolean this_present_limit = true && this.isSetLimit();
+ boolean that_present_limit = true && that.isSetLimit();
+ if (this_present_limit || that_present_limit) {
+ if (!(this_present_limit && that_present_limit))
+ return false;
+ if (this.limit != that.limit)
+ return false;
+ }
+
return true;
}
@@ -363,6 +473,16 @@ import org.slf4j.LoggerFactory;
if (present_catName)
list.add(catName);
+ boolean present_toEventId = true && (isSetToEventId());
+ list.add(present_toEventId);
+ if (present_toEventId)
+ list.add(toEventId);
+
+ boolean present_limit = true && (isSetLimit());
+ list.add(present_limit);
+ if (present_limit)
+ list.add(limit);
+
return list.hashCode();
}
@@ -404,6 +524,26 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetToEventId()).compareTo(other.isSetToEventId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetToEventId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.toEventId, other.toEventId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetLimit()).compareTo(other.isSetLimit());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetLimit()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.limit, other.limit);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -445,6 +585,18 @@ import org.slf4j.LoggerFactory;
}
first = false;
}
+ if (isSetToEventId()) {
+ if (!first) sb.append(", ");
+ sb.append("toEventId:");
+ sb.append(this.toEventId);
+ first = false;
+ }
+ if (isSetLimit()) {
+ if (!first) sb.append(", ");
+ sb.append("limit:");
+ sb.append(this.limit);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -522,6 +674,22 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // TO_EVENT_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.toEventId = iprot.readI64();
+ struct.setToEventIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 5: // LIMIT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.limit = iprot.readI64();
+ struct.setLimitIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -550,6 +718,16 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldEnd();
}
}
+ if (struct.isSetToEventId()) {
+ oprot.writeFieldBegin(TO_EVENT_ID_FIELD_DESC);
+ oprot.writeI64(struct.toEventId);
+ oprot.writeFieldEnd();
+ }
+ if (struct.isSetLimit()) {
+ oprot.writeFieldBegin(LIMIT_FIELD_DESC);
+ oprot.writeI64(struct.limit);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -573,10 +751,22 @@ import org.slf4j.LoggerFactory;
if (struct.isSetCatName()) {
optionals.set(0);
}
- oprot.writeBitSet(optionals, 1);
+ if (struct.isSetToEventId()) {
+ optionals.set(1);
+ }
+ if (struct.isSetLimit()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.isSetCatName()) {
oprot.writeString(struct.catName);
}
+ if (struct.isSetToEventId()) {
+ oprot.writeI64(struct.toEventId);
+ }
+ if (struct.isSetLimit()) {
+ oprot.writeI64(struct.limit);
+ }
}
@Override
@@ -586,11 +776,19 @@ import org.slf4j.LoggerFactory;
struct.setFromEventIdIsSet(true);
struct.dbName = iprot.readString();
struct.setDbNameIsSet(true);
- BitSet incoming = iprot.readBitSet(1);
+ BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
struct.catName = iprot.readString();
struct.setCatNameIsSet(true);
}
+ if (incoming.get(1)) {
+ struct.toEventId = iprot.readI64();
+ struct.setToEventIdIsSet(true);
+ }
+ if (incoming.get(2)) {
+ struct.limit = iprot.readI64();
+ struct.setLimitIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
index 775c9d9..5fd5d78 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
@@ -22619,6 +22619,14 @@ class NotificationEventsCountRequest {
* @var string
*/
public $catName = null;
+ /**
+ * @var int
+ */
+ public $toEventId = null;
+ /**
+ * @var int
+ */
+ public $limit = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -22635,6 +22643,14 @@ class NotificationEventsCountRequest {
'var' => 'catName',
'type' => TType::STRING,
),
+ 4 => array(
+ 'var' => 'toEventId',
+ 'type' => TType::I64,
+ ),
+ 5 => array(
+ 'var' => 'limit',
+ 'type' => TType::I64,
+ ),
);
}
if (is_array($vals)) {
@@ -22647,6 +22663,12 @@ class NotificationEventsCountRequest {
if (isset($vals['catName'])) {
$this->catName = $vals['catName'];
}
+ if (isset($vals['toEventId'])) {
+ $this->toEventId = $vals['toEventId'];
+ }
+ if (isset($vals['limit'])) {
+ $this->limit = $vals['limit'];
+ }
}
}
@@ -22690,6 +22712,20 @@ class NotificationEventsCountRequest {
$xfer += $input->skip($ftype);
}
break;
+ case 4:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->toEventId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
+ case 5:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->limit);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -22718,6 +22754,16 @@ class NotificationEventsCountRequest {
$xfer += $output->writeString($this->catName);
$xfer += $output->writeFieldEnd();
}
+ if ($this->toEventId !== null) {
+ $xfer += $output->writeFieldBegin('toEventId', TType::I64, 4);
+ $xfer += $output->writeI64($this->toEventId);
+ $xfer += $output->writeFieldEnd();
+ }
+ if ($this->limit !== null) {
+ $xfer += $output->writeFieldBegin('limit', TType::I64, 5);
+ $xfer += $output->writeI64($this->limit);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 9d3885c..03c2a4e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15814,6 +15814,8 @@ class NotificationEventsCountRequest:
- fromEventId
- dbName
- catName
+ - toEventId
+ - limit
"""
thrift_spec = (
@@ -15821,12 +15823,16 @@ class NotificationEventsCountRequest:
(1, TType.I64, 'fromEventId', None, None, ), # 1
(2, TType.STRING, 'dbName', None, None, ), # 2
(3, TType.STRING, 'catName', None, None, ), # 3
+ (4, TType.I64, 'toEventId', None, None, ), # 4
+ (5, TType.I64, 'limit', None, None, ), # 5
)
- def __init__(self, fromEventId=None, dbName=None, catName=None,):
+ def __init__(self, fromEventId=None, dbName=None, catName=None, toEventId=None, limit=None,):
self.fromEventId = fromEventId
self.dbName = dbName
self.catName = catName
+ self.toEventId = toEventId
+ self.limit = limit
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -15852,6 +15858,16 @@ class NotificationEventsCountRequest:
self.catName = iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.I64:
+ self.toEventId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.I64:
+ self.limit = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -15874,6 +15890,14 @@ class NotificationEventsCountRequest:
oprot.writeFieldBegin('catName', TType.STRING, 3)
oprot.writeString(self.catName)
oprot.writeFieldEnd()
+ if self.toEventId is not None:
+ oprot.writeFieldBegin('toEventId', TType.I64, 4)
+ oprot.writeI64(self.toEventId)
+ oprot.writeFieldEnd()
+ if self.limit is not None:
+ oprot.writeFieldBegin('limit', TType.I64, 5)
+ oprot.writeI64(self.limit)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -15890,6 +15914,8 @@ class NotificationEventsCountRequest:
value = (value * 31) ^ hash(self.fromEventId)
value = (value * 31) ^ hash(self.dbName)
value = (value * 31) ^ hash(self.catName)
+ value = (value * 31) ^ hash(self.toEventId)
+ value = (value * 31) ^ hash(self.limit)
return value
def __repr__(self):
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 26b89c0..2eea181 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3518,11 +3518,15 @@ class NotificationEventsCountRequest
FROMEVENTID = 1
DBNAME = 2
CATNAME = 3
+ TOEVENTID = 4
+ LIMIT = 5
FIELDS = {
FROMEVENTID => {:type => ::Thrift::Types::I64, :name => 'fromEventId'},
DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
- CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true}
+ CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName', :optional => true},
+ TOEVENTID => {:type => ::Thrift::Types::I64, :name => 'toEventId', :optional => true},
+ LIMIT => {:type => ::Thrift::Types::I64, :name => 'limit', :optional => true}
}
def struct_fields; FIELDS; end
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index a2a6740..4b7b615 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1147,7 +1147,9 @@ struct CurrentNotificationEventId {
struct NotificationEventsCountRequest {
1: required i64 fromEventId,
2: required string dbName,
- 3: optional string catName
+ 3: optional string catName,
+ 4: optional i64 toEventId,
+ 5: optional i64 limit
}
struct NotificationEventsCountResponse {
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index ddd64e7..9c15804 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -10260,14 +10260,60 @@ public class ObjectStore implements RawStore, Configurable {
long fromEventId = rqst.getFromEventId();
String inputDbName = rqst.getDbName();
String catName = rqst.isSetCatName() ? rqst.getCatName() : getDefaultCatalog(conf);
- String queryStr = "select count(eventId) from " + MNotificationLog.class.getName()
- + " where eventId > fromEventId && dbName == inputDbName && catalogName == catName";
+ long toEventId;
+ String paramSpecs;
+ List<Object> paramVals = new ArrayList<Object>();
+
+ // We store a catalog name in lower case in metastore and also use the same way everywhere in
+ // hive.
+ assert catName.equals(catName.toLowerCase());
+
+ // Build the query to count events, part by part
+ String queryStr = "select count(eventId) from " + MNotificationLog.class.getName();
+ // count fromEventId onwards events
+ queryStr = queryStr + " where eventId > fromEventId";
+ paramSpecs = "java.lang.Long fromEventId";
+ paramVals.add(Long.valueOf(fromEventId));
+
+ // Input database name can be a database name or a *. In the first case we add a filter
+ // condition on dbName column, but not in the second case, since a * means all the
+ // databases. In case we support more elaborate database name patterns in future, we will
+ // have to apply a method similar to getNextNotification() method of MetaStoreClient.
+ if (!inputDbName.equals("*")) {
+ // dbName could be NULL in case of transaction related events, which also need to be
+ // counted.
+ queryStr = queryStr + " && (dbName == inputDbName || dbName == null)";
+ paramSpecs = paramSpecs + ", java.lang.String inputDbName";
+ // We store a database name in lower case in metastore.
+ paramVals.add(inputDbName.toLowerCase());
+ }
+
+ // catName could be NULL in case of transaction related events, which also need to be
+ // counted.
+ queryStr = queryStr + " && (catalogName == catName || catalogName == null)";
+ paramSpecs = paramSpecs +", java.lang.String catName";
+ paramVals.add(catName);
+
+ // count events upto toEventId if specified
+ if (rqst.isSetToEventId()) {
+ toEventId = rqst.getToEventId();
+ queryStr = queryStr + " && eventId <= toEventId";
+ paramSpecs = paramSpecs + ", java.lang.Long toEventId";
+ paramVals.add(Long.valueOf(toEventId));
+ }
+
query = pm.newQuery(queryStr);
- query.declareParameters("java.lang.Long fromEventId, java.lang.String inputDbName," +
- " java.lang.String catName");
- result = (Long) query.execute(fromEventId, inputDbName, catName);
+ query.declareParameters(paramSpecs);
+ result = (Long) query.executeWithArray(paramVals.toArray());
commited = commitTransaction();
- return new NotificationEventsCountResponse(result.longValue());
+
+ // Cap the event count by limit if specified.
+ long eventCount = result.longValue();
+ if (rqst.isSetLimit() && eventCount > rqst.getLimit()) {
+ eventCount = rqst.getLimit();
+ }
+
+ return new NotificationEventsCountResponse(eventCount);
} finally {
rollbackAndCleanup(commited, query);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0d4d03fd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 351fafd..0bb739f 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1652,7 +1652,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (transactionalListeners != null) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
EventMessage.EventType.ALLOC_WRITE_ID,
- new AllocWriteIdEvent(txnToWriteIds, rqst.getDbName(), rqst.getTableName(), null),
+ new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null),
dbConn, sqlGenerator);
}