You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/12/19 22:10:09 UTC
[8/8] hive git commit: HIVE-15294: Capture additional metadata to
replicate a simple insert at destination (Vaibhav Gumashta reviewed by Thejas
Nair)
HIVE-15294: Capture additional metadata to replicate a simple insert at destination (Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bbd99ed6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bbd99ed6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bbd99ed6
Branch: refs/heads/master
Commit: bbd99ed60e5708af3dc329b097d4b024f73041bd
Parents: 12f5550
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Mon Dec 19 14:02:01 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Mon Dec 19 14:02:01 2016 -0800
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 55 +-
.../listener/TestDbNotificationListener.java | 955 +++++---
metastore/if/hive_metastore.thrift | 4 +-
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2054 ++++++++--------
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 821 ++++---
.../gen/thrift/gen-cpp/hive_metastore_types.h | 13 +
.../metastore/api/ClearFileMetadataRequest.java | 32 +-
.../hive/metastore/api/ClientCapabilities.java | 32 +-
.../hive/metastore/api/FireEventRequest.java | 32 +-
.../metastore/api/GetAllFunctionsResponse.java | 36 +-
.../api/GetFileMetadataByExprRequest.java | 32 +-
.../api/GetFileMetadataByExprResult.java | 48 +-
.../metastore/api/GetFileMetadataRequest.java | 32 +-
.../metastore/api/GetFileMetadataResult.java | 44 +-
.../hive/metastore/api/GetTablesRequest.java | 32 +-
.../hive/metastore/api/GetTablesResult.java | 36 +-
.../metastore/api/InsertEventRequestData.java | 181 +-
.../metastore/api/PutFileMetadataRequest.java | 64 +-
.../hive/metastore/api/ThriftHiveMetastore.java | 2220 +++++++++---------
.../gen-php/metastore/ThriftHiveMetastore.php | 1250 +++++-----
.../src/gen/thrift/gen-php/metastore/Types.php | 291 ++-
.../hive_metastore/ThriftHiveMetastore.py | 842 +++----
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 185 +-
.../gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 28 +-
.../hive/metastore/events/InsertEvent.java | 36 +-
.../metastore/messaging/MessageFactory.java | 22 +-
.../messaging/json/JSONInsertMessage.java | 54 +-
.../messaging/json/JSONMessageFactory.java | 36 +
.../apache/hadoop/hive/ql/metadata/Hive.java | 56 +-
.../apache/hadoop/fs/ProxyLocalFileSystem.java | 104 +-
31 files changed, 5202 insertions(+), 4429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 119801f..8d29bfc 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +104,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param tableEvent table event.
* @throws org.apache.hadoop.hive.metastore.api.MetaException
*/
+ @Override
public void onConfigChange(ConfigChangeEvent tableEvent) throws MetaException {
String key = tableEvent.getKey();
if (key.equals(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL.toString())) {
@@ -122,6 +122,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param tableEvent table event.
* @throws MetaException
*/
+ @Override
public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
Table t = tableEvent.getTable();
NotificationEvent event =
@@ -129,13 +130,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildCreateTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- enqueue(event);
+ process(event);
}
/**
* @param tableEvent table event.
* @throws MetaException
*/
+ @Override
public void onDropTable(DropTableEvent tableEvent) throws MetaException {
Table t = tableEvent.getTable();
NotificationEvent event =
@@ -143,13 +145,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- enqueue(event);
+ process(event);
}
/**
* @param tableEvent alter table event
* @throws MetaException
*/
+ @Override
public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
Table before = tableEvent.getOldTable();
Table after = tableEvent.getNewTable();
@@ -158,13 +161,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildAlterTableMessage(before, after).toString());
event.setDbName(after.getDbName());
event.setTableName(after.getTableName());
- enqueue(event);
+ process(event);
}
/**
* @param partitionEvent partition event
* @throws MetaException
*/
+ @Override
public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
Table t = partitionEvent.getTable();
String msg = msgFactory
@@ -173,13 +177,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- enqueue(event);
+ process(event);
}
/**
* @param partitionEvent partition event
* @throws MetaException
*/
+ @Override
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
Table t = partitionEvent.getTable();
NotificationEvent event =
@@ -187,13 +192,14 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
- enqueue(event);
+ process(event);
}
/**
* @param partitionEvent partition event
* @throws MetaException
*/
+ @Override
public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
Partition before = partitionEvent.getOldPartition();
Partition after = partitionEvent.getNewPartition();
@@ -202,91 +208,98 @@ public class DbNotificationListener extends MetaStoreEventListener {
.buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
event.setDbName(before.getDbName());
event.setTableName(before.getTableName());
- enqueue(event);
+ process(event);
}
/**
* @param dbEvent database event
* @throws MetaException
*/
+ @Override
public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
Database db = dbEvent.getDatabase();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
.buildCreateDatabaseMessage(db).toString());
event.setDbName(db.getName());
- enqueue(event);
+ process(event);
}
/**
* @param dbEvent database event
* @throws MetaException
*/
+ @Override
public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
Database db = dbEvent.getDatabase();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
.buildDropDatabaseMessage(db).toString());
event.setDbName(db.getName());
- enqueue(event);
+ process(event);
}
/**
* @param fnEvent function event
* @throws MetaException
*/
+ @Override
public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException {
Function fn = fnEvent.getFunction();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
.buildCreateFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- enqueue(event);
+ process(event);
}
/**
* @param fnEvent function event
* @throws MetaException
*/
+ @Override
public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException {
Function fn = fnEvent.getFunction();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
.buildDropFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
- enqueue(event);
+ process(event);
}
/**
* @param indexEvent index event
* @throws MetaException
*/
+ @Override
public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
Index index = indexEvent.getIndex();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
.buildCreateIndexMessage(index).toString());
event.setDbName(index.getDbName());
- enqueue(event);
+ process(event);
}
/**
* @param indexEvent index event
* @throws MetaException
*/
+ @Override
public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
Index index = indexEvent.getIndex();
NotificationEvent event =
new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
.buildDropIndexMessage(index).toString());
event.setDbName(index.getDbName());
- enqueue(event);
+ process(event);
}
/**
* @param indexEvent index event
* @throws MetaException
*/
+ @Override
public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
Index before = indexEvent.getOldIndex();
Index after = indexEvent.getNewIndex();
@@ -294,7 +307,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
.buildAlterIndexMessage(before, after).toString());
event.setDbName(before.getDbName());
- enqueue(event);
+ process(event);
}
@Override
@@ -302,20 +315,20 @@ public class DbNotificationListener extends MetaStoreEventListener {
NotificationEvent event =
new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
- insertEvent.getFiles()).toString());
+ insertEvent.getFiles(), insertEvent.getFileChecksums()).toString());
event.setDbName(insertEvent.getDb());
event.setTableName(insertEvent.getTable());
- enqueue(event);
+ process(event);
}
/**
* @param partSetDoneEvent
* @throws MetaException
*/
+ @Override
public void onLoadPartitionDone(LoadPartitionDoneEvent partSetDoneEvent) throws MetaException {
// TODO, we don't support this, but we should, since users may create an empty partition and
// then load data into it.
-
}
private int now() {
@@ -329,10 +342,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
return (int)millis;
}
- private void enqueue(NotificationEvent event) {
+ // Process this notification by adding it to metastore DB
+ private void process(NotificationEvent event) {
if (rs != null) {
- synchronized(NOTIFICATION_TBL_LOCK) {
- LOG.debug("DbNotif:Enqueueing : {}:{}",event.getEventId(),event.getMessage());
+ synchronized (NOTIFICATION_TBL_LOCK) {
+ LOG.debug("DbNotificationListener: Processing : {}:{}", event.getEventId(),
+ event.getMessage());
rs.addNotificationEvent(event);
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 0b691b1..9e6af8f 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -15,7 +15,7 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-*/
+ */
package org.apache.hive.hcatalog.listener;
import static org.junit.Assert.assertEquals;
@@ -27,7 +27,7 @@ import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -53,16 +53,11 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.hcatalog.common.HCatConstants;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -70,8 +65,13 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Tests DbNotificationListener when used as a transactional event listener
+ * (hive.metastore.transactional.event.listeners)
+ */
public class TestDbNotificationListener {
- private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class
+ .getName());
private static final int EVENTS_TTL = 30;
private static final int CLEANUP_SLEEP_TIME = 10;
private static Map<String, String> emptyParameters = new HashMap<String, String>();
@@ -86,12 +86,11 @@ public class TestDbNotificationListener {
HiveConf conf = new HiveConf();
conf.setVar(HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
DbNotificationListener.class.getName());
- conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL)+"s");
+ conf.setVar(HiveConf.ConfVars.METASTORE_EVENT_DB_LISTENER_TTL, String.valueOf(EVENTS_TTL) + "s");
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
conf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
conf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
- conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
- DummyRawStoreFailEvent.class.getName());
+ conf.setVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL, DummyRawStoreFailEvent.class.getName());
Class dbNotificationListener =
Class.forName("org.apache.hive.hcatalog.listener.DbNotificationListener");
Class[] classes = dbNotificationListener.getDeclaredClasses();
@@ -102,8 +101,7 @@ public class TestDbNotificationListener {
sleepTimeField.set(null, CLEANUP_SLEEP_TIME * 1000);
}
}
- conf
- .setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
SessionState.start(new CliSessionState(conf));
msClient = new HiveMetaStoreClient(conf);
@@ -114,363 +112,488 @@ public class TestDbNotificationListener {
public void setup() throws Exception {
long now = System.currentTimeMillis() / 1000;
startTime = 0;
- if (now > Integer.MAX_VALUE) fail("Bummer, time has fallen over the edge");
- else startTime = (int) now;
+ if (now > Integer.MAX_VALUE) {
+ fail("Bummer, time has fallen over the edge");
+ } else {
+ startTime = (int) now;
+ }
firstEventId = msClient.getCurrentNotificationEventId().getEventId();
DummyRawStoreFailEvent.setEventSucceed(true);
}
@Test
public void createDatabase() throws Exception {
- Database db = new Database("mydb", "no description", "file:/tmp", emptyParameters);
+ String dbName = "createdb";
+ String dbName2 = "createdb2";
+ String dbLocationUri = "file:/tmp";
+ String dbDescription = "no description";
+ Database db = new Database(dbName, dbDescription, dbLocationUri, emptyParameters);
msClient.createDatabase(db);
+ // Read notification from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
+ // Read event from notification
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType());
- assertEquals("mydb", event.getDbName());
+ assertEquals(EventType.CREATE_DATABASE.toString(), event.getEventType());
+ assertEquals(dbName, event.getDbName());
assertNull(event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_DATABASE\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"mydb\",\"timestamp\":[0-9]+}"));
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(EventType.CREATE_DATABASE.toString(), jsonTree.get("eventType").asText());
+ assertEquals(dbName, jsonTree.get("db").asText());
+
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
- db = new Database("mydb2", "no description", "file:/tmp", emptyParameters);
+ db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
try {
msClient.createDatabase(db);
+ fail("Error: create database should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
}
@Test
public void dropDatabase() throws Exception {
- Database db = new Database("dropdb", "no description", "file:/tmp", emptyParameters);
+ String dbName = "dropdb";
+ String dbName2 = "dropdb2";
+ String dbLocationUri = "file:/tmp";
+ String dbDescription = "no description";
+ Database db = new Database(dbName, dbDescription, dbLocationUri, emptyParameters);
msClient.createDatabase(db);
- msClient.dropDatabase("dropdb");
+ msClient.dropDatabase(dbName);
+ // Read notification from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+
+ // Two events: one for create db and other for drop db
assertEquals(2, rsp.getEventsSize());
+ // Read event from notification
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
- assertEquals("dropdb", event.getDbName());
+ assertEquals(EventType.DROP_DATABASE.toString(), event.getEventType());
+ assertEquals(dbName, event.getDbName());
assertNull(event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_DATABASE\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"dropdb\",\"timestamp\":[0-9]+}"));
- db = new Database("dropdb", "no description", "file:/tmp", emptyParameters);
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(EventType.DROP_DATABASE.toString(), jsonTree.get("eventType").asText());
+ assertEquals(dbName, jsonTree.get("db").asText());
+
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
+ db = new Database(dbName2, dbDescription, dbLocationUri, emptyParameters);
msClient.createDatabase(db);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.dropDatabase("dropdb");
+ msClient.dropDatabase(dbName2);
+ fail("Error: drop database should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
}
@Test
public void createTable() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "createtable";
+ String tblName2 = "createtable2";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
StorageDescriptor sd =
- new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null,
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
emptyParameters);
- Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
msClient.createTable(table);
- // Get the event
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("mytable", event.getTableName());
+ assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
// Parse the message field
ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
- assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText());
- assertEquals("default", jsonTree.get("db").asText());
- assertEquals("mytable", jsonTree.get("table").asText());
+ assertEquals(EventType.CREATE_TABLE.toString(), jsonTree.get("eventType").asText());
+ assertEquals(defaultDbName, jsonTree.get("db").asText());
+ assertEquals(tblName, jsonTree.get("table").asText());
Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
assertEquals(table, tableObj);
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
table =
- new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters,
- null, null, null);
+ new Table(tblName2, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
msClient.createTable(table);
+ fail("Error: create table should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
}
@Test
public void alterTable() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "altertabletbl";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
+ FieldSchema col2 = new FieldSchema("col2", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
- new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd,
+ new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+
// Event 1
msClient.createTable(table);
- cols.add(new FieldSchema("col2", "int", ""));
- table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
- new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+ cols.add(col2);
+ table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd,
+ new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
// Event 2
- msClient.alter_table("default", "alttable", table);
+ msClient.alter_table(defaultDbName, tblName, table);
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("alttable", event.getTableName());
+ assertEquals(EventType.ALTER_TABLE.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
// Parse the message field
ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
- assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText());
- assertEquals("default", jsonTree.get("db").asText());
- assertEquals("alttable", jsonTree.get("table").asText());
+ assertEquals(EventType.ALTER_TABLE.toString(), jsonTree.get("eventType").asText());
+ assertEquals(defaultDbName, jsonTree.get("db").asText());
+ assertEquals(tblName, jsonTree.get("table").asText());
Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
assertEquals(table, tableObj);
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.alter_table("default", "alttable", table);
+ msClient.alter_table(defaultDbName, tblName, table);
+ fail("Error: alter table should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
}
@Test
public void dropTable() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "droptbl";
+ String tblName2 = "droptbl2";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("droptable", "default", "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+
+ // Event 1
msClient.createTable(table);
- msClient.dropTable("default", "droptable");
+ // Event 2
+ msClient.dropTable(defaultDbName, tblName);
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("droptable", event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_TABLE\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
- "\"droptable\",\"timestamp\":[0-9]+}"));
-
- table = new Table("droptable2", "default", "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
+
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(EventType.DROP_TABLE.toString(), jsonTree.get("eventType").asText());
+ assertEquals(defaultDbName, jsonTree.get("db").asText());
+ assertEquals(tblName, jsonTree.get("table").asText());
+
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
+ table =
+ new Table(tblName2, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
msClient.createTable(table);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.dropTable("default", "droptable2");
+ msClient.dropTable(defaultDbName, tblName2);
+ fail("Error: drop table should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
}
@Test
public void addPartition() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "addptn";
+ String tblName2 = "addptn2";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
- List<FieldSchema> partCols = new ArrayList<FieldSchema>();
- partCols.add(new FieldSchema("ds", "string", ""));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("addPartTable", "default", "me", startTime, startTime, 0, sd, partCols,
- emptyParameters, null, null, null);
- msClient.createTable(table);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+ List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ List<String> partCol1Vals = Arrays.asList("today");
+ partCols.add(partCol1);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+ emptyParameters, null, null, null);
- Partition partition = new Partition(Arrays.asList("today"), "default", "addPartTable",
- startTime, startTime, sd, emptyParameters);
+ // Event 1
+ msClient.createTable(table);
+ Partition partition =
+ new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+ emptyParameters);
+ // Event 2
msClient.add_partition(partition);
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("addparttable", event.getTableName());
+ assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
// Parse the message field
ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
- assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText());
- assertEquals("default", jsonTree.get("db").asText());
- assertEquals("addparttable", jsonTree.get("table").asText());
+ assertEquals(EventType.ADD_PARTITION.toString(), jsonTree.get("eventType").asText());
+ assertEquals(defaultDbName, jsonTree.get("db").asText());
+ assertEquals(tblName, jsonTree.get("table").asText());
List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
assertEquals(partition, partitionObjList.get(0));
- partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist",
- startTime, startTime, sd, emptyParameters);
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
+ partition =
+ new Partition(Arrays.asList("tomorrow"), defaultDbName, tblName2, startTime, startTime, sd,
+ emptyParameters);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
msClient.add_partition(partition);
+ fail("Error: add partition should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
}
@Test
public void alterPartition() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "alterptn";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
- List<FieldSchema> partCols = new ArrayList<FieldSchema>();
- partCols.add(new FieldSchema("ds", "string", ""));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("alterparttable", "default", "me", startTime, startTime, 0, sd,
- partCols, emptyParameters, null, null, null);
- msClient.createTable(table);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+ List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ List<String> partCol1Vals = Arrays.asList("today");
+ partCols.add(partCol1);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+ emptyParameters, null, null, null);
- Partition partition = new Partition(Arrays.asList("today"), "default", "alterparttable",
- startTime, startTime, sd, emptyParameters);
+ // Event 1
+ msClient.createTable(table);
+ Partition partition =
+ new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+ emptyParameters);
+ // Event 2
msClient.add_partition(partition);
+ Partition newPart =
+ new Partition(Arrays.asList("today"), defaultDbName, tblName, startTime, startTime + 1, sd,
+ emptyParameters);
+ // Event 3
+ msClient.alter_partition(defaultDbName, tblName, newPart, null);
- Partition newPart = new Partition(Arrays.asList("today"), "default", "alterparttable",
- startTime, startTime + 1, sd, emptyParameters);
- msClient.alter_partition("default", "alterparttable", newPart, null);
-
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("alterparttable", event.getTableName());
+ assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
// Parse the message field
ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
- assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, jsonTree.get("eventType").asText());
- assertEquals("default", jsonTree.get("db").asText());
- assertEquals("alterparttable", jsonTree.get("table").asText());
+ assertEquals(EventType.ALTER_PARTITION.toString(), jsonTree.get("eventType").asText());
+ assertEquals(defaultDbName, jsonTree.get("db").asText());
+ assertEquals(tblName, jsonTree.get("table").asText());
List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
assertEquals(newPart, partitionObjList.get(0));
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.alter_partition("default", "alterparttable", newPart, null);
+ msClient.alter_partition(defaultDbName, tblName, newPart, null);
+ fail("Error: alter partition should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
}
@Test
public void dropPartition() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "dropptn";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
- List<FieldSchema> partCols = new ArrayList<FieldSchema>();
- partCols.add(new FieldSchema("ds", "string", ""));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("dropPartTable", "default", "me", startTime, startTime, 0, sd, partCols,
- emptyParameters, null, null, null);
- msClient.createTable(table);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+ List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ List<String> partCol1Vals = Arrays.asList("today");
+ partCols.add(partCol1);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+ emptyParameters, null, null, null);
- Partition partition = new Partition(Arrays.asList("today"), "default", "dropPartTable",
- startTime, startTime, sd, emptyParameters);
+ // Event 1
+ msClient.createTable(table);
+ Partition partition =
+ new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+ emptyParameters);
+ // Event 2
msClient.add_partition(partition);
+ // Event 3
+ msClient.dropPartition(defaultDbName, tblName, partCol1Vals, false);
- msClient.dropPartition("default", "dropparttable", Arrays.asList("today"), false);
-
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("dropparttable", event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"DROP_PARTITION\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
- "\"dropparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
-
- partition = new Partition(Arrays.asList("tomorrow"), "default", "dropPartTable",
- startTime, startTime, sd, emptyParameters);
- msClient.add_partition(partition);
+ assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
+
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(EventType.DROP_PARTITION.toString(), jsonTree.get("eventType").asText());
+ assertEquals(defaultDbName, jsonTree.get("db").asText());
+ assertEquals(tblName, jsonTree.get("table").asText());
+
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
+ List<String> newpartCol1Vals = Arrays.asList("tomorrow");
+ partition =
+ new Partition(newpartCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+ emptyParameters);
+ msClient.add_partition(partition);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.dropPartition("default", "dropparttable", Arrays.asList("tomorrow"), false);
+ msClient.dropPartition(defaultDbName, tblName, newpartCol1Vals, false);
+ fail("Error: drop partition should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(4, rsp.getEventsSize());
}
@Test
public void createFunction() throws Exception {
- String funcName = "createFunction";
- String dbName = "default";
+ String defaultDbName = "default";
+ String funcName = "createfunction";
+ String funcName2 = "createfunction2";
String ownerName = "me";
- String funcClass = "o.a.h.h.myfunc";
+ String funcClass = "o.a.h.h.createfunc";
+ String funcClass2 = "o.a.h.h.createfunc2";
String funcResource = "file:/tmp/somewhere";
- Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER,
- startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
- funcResource)));
+ String funcResource2 = "file:/tmp/somewhere2";
+ Function func =
+ new Function(funcName, defaultDbName, funcClass, ownerName, PrincipalType.USER, startTime,
+ FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, funcResource)));
+ // Event 1
msClient.createFunction(func);
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType());
- assertEquals(dbName, event.getDbName());
+ assertEquals(EventType.CREATE_FUNCTION.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+
+ // Parse the message field
Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
- assertEquals(dbName, funcObj.getDbName());
+ assertEquals(defaultDbName, funcObj.getDbName());
assertEquals(funcName, funcObj.getFunctionName());
assertEquals(funcClass, funcObj.getClassName());
assertEquals(ownerName, funcObj.getOwnerName());
@@ -479,41 +602,53 @@ public class TestDbNotificationListener {
assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
- func = new Function("createFunction2", dbName, "o.a.h.h.myfunc2", "me", PrincipalType.USER,
- startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
- "file:/tmp/somewhere2")));
+ func =
+ new Function(funcName2, defaultDbName, funcClass2, ownerName, PrincipalType.USER,
+ startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+ funcResource2)));
try {
msClient.createFunction(func);
+ fail("Error: create function should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
}
@Test
public void dropFunction() throws Exception {
- String funcName = "dropfunctiontest";
- String dbName = "default";
+ String defaultDbName = "default";
+ String funcName = "dropfunction";
+ String funcName2 = "dropfunction2";
String ownerName = "me";
- String funcClass = "o.a.h.h.dropFunctionTest";
+ String funcClass = "o.a.h.h.dropfunction";
+ String funcClass2 = "o.a.h.h.dropfunction2";
String funcResource = "file:/tmp/somewhere";
- Function func = new Function(funcName, dbName, funcClass, ownerName, PrincipalType.USER,
- startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
- funcResource)));
+ String funcResource2 = "file:/tmp/somewhere2";
+ Function func =
+ new Function(funcName, defaultDbName, funcClass, ownerName, PrincipalType.USER, startTime,
+ FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR, funcResource)));
+ // Event 1
msClient.createFunction(func);
- msClient.dropFunction(dbName, funcName);
+ // Event 2
+ msClient.dropFunction(defaultDbName, funcName);
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType());
- assertEquals(dbName, event.getDbName());
+ assertEquals(EventType.DROP_FUNCTION.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+
+ // Parse the message field
Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
- assertEquals(dbName, funcObj.getDbName());
+ assertEquals(defaultDbName, funcObj.getDbName());
assertEquals(funcName, funcObj.getFunctionName());
assertEquals(funcClass, funcObj.getClassName());
assertEquals(ownerName, funcObj.getOwnerName());
@@ -522,17 +657,20 @@ public class TestDbNotificationListener {
assertEquals(ResourceType.JAR, funcObj.getResourceUris().get(0).getResourceType());
assertEquals(funcResource, funcObj.getResourceUris().get(0).getUri());
- func = new Function("dropfunctiontest2", dbName, "o.a.h.h.dropFunctionTest2", "me",
- PrincipalType.USER, startTime, FunctionType.JAVA, Arrays.asList(
- new ResourceUri(ResourceType.JAR, "file:/tmp/somewhere2")));
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
+ func =
+ new Function(funcName2, defaultDbName, funcClass2, ownerName, PrincipalType.USER,
+ startTime, FunctionType.JAVA, Arrays.asList(new ResourceUri(ResourceType.JAR,
+ funcResource2)));
msClient.createFunction(func);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.dropFunction(dbName, "dropfunctiontest2");
+ msClient.dropFunction(defaultDbName, funcName2);
+ fail("Error: drop function should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
}
@@ -543,46 +681,60 @@ public class TestDbNotificationListener {
String dbName = "default";
String tableName = "createIndexTable";
String indexTableName = tableName + "__" + indexName + "__";
- int startTime = (int)(System.currentTimeMillis() / 1000);
+ int startTime = (int) (System.currentTimeMillis() / 1000);
List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(new FieldSchema("col1", "int", ""));
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
Map<String, String> params = new HashMap<String, String>();
params.put("key", "value");
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
- serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
- Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, serde,
+ Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+ Table table =
+ new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
+ // Event 1
msClient.createTable(table);
- Index index = new Index(indexName, null, "default", tableName, startTime, startTime,
- indexTableName, sd, emptyParameters, false);
- Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ Index index =
+ new Index(indexName, null, "default", tableName, startTime, startTime, indexTableName, sd,
+ emptyParameters, false);
+ Table indexTable =
+ new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
+ // Event 2, 3 (index table and index)
msClient.createIndex(index, indexTable);
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType());
+ assertEquals(EventType.CREATE_INDEX.toString(), event.getEventType());
assertEquals(dbName, event.getDbName());
+
+ // Parse the message field
Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
assertEquals(dbName, indexObj.getDbName());
assertEquals(indexName, indexObj.getIndexName());
assertEquals(tableName, indexObj.getOrigTableName());
assertEquals(indexTableName, indexObj.getIndexTableName());
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
- index = new Index("createIndexTable2", null, "default", tableName, startTime, startTime,
- "createIndexTable2__createIndexTable2__", sd, emptyParameters, false);
- Table indexTable2 = new Table("createIndexTable2__createIndexTable2__", dbName, "me",
- startTime, startTime, 0, sd, null, emptyParameters, null, null, null);
+ index =
+ new Index("createIndexTable2", null, "default", tableName, startTime, startTime,
+ "createIndexTable2__createIndexTable2__", sd, emptyParameters, false);
+ Table indexTable2 =
+ new Table("createIndexTable2__createIndexTable2__", dbName, "me", startTime, startTime, 0,
+ sd, null, emptyParameters, null, null, null);
try {
msClient.createIndex(index, indexTable2);
+ fail("Error: create index should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
}
@@ -593,44 +745,61 @@ public class TestDbNotificationListener {
String dbName = "default";
String tableName = "dropIndexTable";
String indexTableName = tableName + "__" + indexName + "__";
- int startTime = (int)(System.currentTimeMillis() / 1000);
+ int startTime = (int) (System.currentTimeMillis() / 1000);
List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(new FieldSchema("col1", "int", ""));
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
Map<String, String> params = new HashMap<String, String>();
params.put("key", "value");
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
- serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
- Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, serde,
+ Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+ Table table =
+ new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
+ // Event 1
msClient.createTable(table);
- Index index = new Index(indexName, null, "default", tableName, startTime, startTime,
- indexTableName, sd, emptyParameters, false);
- Table indexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ Index index =
+ new Index(indexName, null, "default", tableName, startTime, startTime, indexTableName, sd,
+ emptyParameters, false);
+ Table indexTable =
+ new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
+ // Event 2, 3 (index table and index)
msClient.createIndex(index, indexTable);
- msClient.dropIndex(dbName, tableName, indexName, true); // drops index and indexTable
+ // Event 4 (drops index and indexTable)
+ msClient.dropIndex(dbName, tableName, indexName, true);
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(4, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(3);
assertEquals(firstEventId + 4, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType());
+ assertEquals(EventType.DROP_INDEX.toString(), event.getEventType());
assertEquals(dbName, event.getDbName());
+
+ // Parse the message field
Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
assertEquals(dbName, indexObj.getDbName());
assertEquals(indexName.toLowerCase(), indexObj.getIndexName());
assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName());
assertEquals(indexTableName.toLowerCase(), indexObj.getIndexTableName());
- index = new Index("dropIndexTable2", null, "default", tableName, startTime, startTime,
- "dropIndexTable__dropIndexTable2__", sd, emptyParameters, false);
- Table indexTable2 = new Table("dropIndexTable__dropIndexTable2__", dbName, "me", startTime,
- startTime, 0, sd, null, emptyParameters, null, null, null);
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
+ index =
+ new Index("dropIndexTable2", null, "default", tableName, startTime, startTime,
+ "dropIndexTable__dropIndexTable2__", sd, emptyParameters, false);
+ Table indexTable2 =
+ new Table("dropIndexTable__dropIndexTable2__", dbName, "me", startTime, startTime, 0, sd,
+ null, emptyParameters, null, null, null);
msClient.createIndex(index, indexTable2);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
- msClient.dropIndex(dbName, tableName, "dropIndex2", true); // drops index and indexTable
+ // drops index and indexTable
+ msClient.dropIndex(dbName, tableName, "dropIndex2", true);
+ fail("Error: drop index should've failed");
} catch (Exception ex) {
// expected
}
@@ -645,127 +814,165 @@ public class TestDbNotificationListener {
String dbName = "default";
String tableName = "alterIndexTable";
String indexTableName = tableName + "__" + indexName + "__";
- int startTime = (int)(System.currentTimeMillis() / 1000);
+ int startTime = (int) (System.currentTimeMillis() / 1000);
List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(new FieldSchema("col1", "int", ""));
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
Map<String, String> params = new HashMap<String, String>();
params.put("key", "value");
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17,
- serde, Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
- Table table = new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 17, serde,
+ Arrays.asList("bucketcol"), Arrays.asList(new Order("sortcol", 1)), params);
+ Table table =
+ new Table(tableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
+ // Event 1
msClient.createTable(table);
- Index oldIndex = new Index(indexName, null, "default", tableName, startTime, startTime,
- indexTableName, sd, emptyParameters, false);
- Table oldIndexTable = new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ Index oldIndex =
+ new Index(indexName, null, "default", tableName, startTime, startTime, indexTableName, sd,
+ emptyParameters, false);
+ Table oldIndexTable =
+ new Table(indexTableName, dbName, "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
+ // Event 2, 3
msClient.createIndex(oldIndex, oldIndexTable); // creates index and index table
- Index newIndex = new Index(indexName, null, "default", tableName, startTime, startTime + 1,
- indexTableName, sd, emptyParameters, false);
+ Index newIndex =
+ new Index(indexName, null, "default", tableName, startTime, startTime + 1, indexTableName,
+ sd, emptyParameters, false);
+ // Event 4
msClient.alter_index(dbName, tableName, indexName, newIndex);
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(4, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(3);
assertEquals(firstEventId + 4, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType());
+ assertEquals(EventType.ALTER_INDEX.toString(), event.getEventType());
assertEquals(dbName, event.getDbName());
- Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson");
+
+ // Parse the message field
+ Index indexObj =
+ JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson");
assertEquals(dbName, indexObj.getDbName());
assertEquals(indexName, indexObj.getIndexName());
assertEquals(tableName, indexObj.getOrigTableName());
assertEquals(indexTableName, indexObj.getIndexTableName());
assertTrue(indexObj.getCreateTime() < indexObj.getLastAccessTime());
+ // When hive.metastore.transactional.event.listeners is set,
+ // a failed event should not create a new notification
DummyRawStoreFailEvent.setEventSucceed(false);
try {
msClient.alter_index(dbName, tableName, indexName, newIndex);
+ fail("Error: alter index should've failed");
} catch (Exception ex) {
// expected
}
-
rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(4, rsp.getEventsSize());
}
@Test
public void insertTable() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "inserttbl";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ String fileAdded = "/warehouse/mytable/b1";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("insertTable", "default", "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, null,
+ emptyParameters, null, null, null);
+ // Event 1
msClient.createTable(table);
FireEventRequestData data = new FireEventRequestData();
InsertEventRequestData insertData = new InsertEventRequestData();
data.setInsertData(insertData);
- insertData.addToFilesAdded("/warehouse/mytable/b1");
+ insertData.addToFilesAdded(fileAdded);
FireEventRequest rqst = new FireEventRequest(true, data);
- rqst.setDbName("default");
- rqst.setTableName("insertTable");
+ rqst.setDbName(defaultDbName);
+ rqst.setTableName(tblName);
+ // Event 2
msClient.fireListenerEvent(rqst);
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("insertTable", event.getTableName());
- assertTrue(event.getMessage(),
- event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
- "\"insertTable\",\"timestamp\":[0-9]+,\"files\":\\[\"/warehouse/mytable/b1\"]," +
- "\"partKeyVals\":\\{},\"partitionKeyValues\":\\{}}"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
+ // Parse the message field
+ verifyInsertJSON(event, defaultDbName, tblName, false);
}
@Test
public void insertPartition() throws Exception {
+ String defaultDbName = "default";
+ String tblName = "insertptn";
+ String tblOwner = "me";
+ String serdeLocation = "file:/tmp";
+ String fileAdded = "/warehouse/mytable/b1";
+ FieldSchema col1 = new FieldSchema("col1", "int", "no comment");
List<FieldSchema> cols = new ArrayList<FieldSchema>();
- cols.add(new FieldSchema("col1", "int", "nocomment"));
- List<FieldSchema> partCols = new ArrayList<FieldSchema>();
- partCols.add(new FieldSchema("ds", "string", ""));
+ cols.add(col1);
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("insertPartition", "default", "me", startTime, startTime, 0, sd,
- partCols, emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, serdeLocation, "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ FieldSchema partCol1 = new FieldSchema("ds", "string", "no comment");
+ List<FieldSchema> partCols = new ArrayList<FieldSchema>();
+ List<String> partCol1Vals = Arrays.asList("today");
+ LinkedHashMap<String, String> partKeyVals = new LinkedHashMap<String, String>();
+ partKeyVals.put("ds", "today");
+ partCols.add(partCol1);
+ Table table =
+ new Table(tblName, defaultDbName, tblOwner, startTime, startTime, 0, sd, partCols,
+ emptyParameters, null, null, null);
+ // Event 1
msClient.createTable(table);
- Partition partition = new Partition(Arrays.asList("today"), "default", "insertPartition",
- startTime, startTime, sd, emptyParameters);
+ Partition partition =
+ new Partition(partCol1Vals, defaultDbName, tblName, startTime, startTime, sd,
+ emptyParameters);
+ // Event 2
msClient.add_partition(partition);
-
FireEventRequestData data = new FireEventRequestData();
InsertEventRequestData insertData = new InsertEventRequestData();
data.setInsertData(insertData);
- insertData.addToFilesAdded("/warehouse/mytable/today/b1");
+ insertData.addToFilesAdded(fileAdded);
FireEventRequest rqst = new FireEventRequest(true, data);
- rqst.setDbName("default");
- rqst.setTableName("insertPartition");
- rqst.setPartitionVals(Arrays.asList("today"));
+ rqst.setDbName(defaultDbName);
+ rqst.setTableName(tblName);
+ rqst.setPartitionVals(partCol1Vals);
+ // Event 3
msClient.fireListenerEvent(rqst);
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(3, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- assertEquals("default", event.getDbName());
- assertEquals("insertPartition", event.getTableName());
- assertTrue(event.getMessage(),
- event.getMessage().matches("\\{\"eventType\":\"INSERT\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
- "\"insertPartition\",\"timestamp\":[0-9]+," +
- "\"files\":\\[\"/warehouse/mytable/today/b1\"],\"partKeyVals\":\\{\"ds\":\"today\"}," +
- "\"partitionKeyValues\":\\{\"ds\":\"today\"}}"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ assertEquals(defaultDbName, event.getDbName());
+ assertEquals(tblName, event.getTableName());
+ // Parse the message field
+ verifyInsertJSON(event, defaultDbName, tblName, false);
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ LinkedHashMap<String, String> partKeyValsFromNotif =
+ JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"),
+ new LinkedHashMap<String, String>());
+ assertEquals(partKeyVals, partKeyValsFromNotif);
}
@Test
@@ -777,6 +984,7 @@ public class TestDbNotificationListener {
db = new Database("db3", "no description", "file:/tmp", emptyParameters);
msClient.createDatabase(db);
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 2, null);
assertEquals(2, rsp.getEventsSize());
assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
@@ -794,10 +1002,11 @@ public class TestDbNotificationListener {
IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
@Override
public boolean accept(NotificationEvent event) {
- return event.getEventType().equals(HCatConstants.HCAT_DROP_DATABASE_EVENT);
+ return event.getEventType().equals(EventType.DROP_DATABASE.toString());
}
};
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, filter);
assertEquals(1, rsp.getEventsSize());
assertEquals(firstEventId + 3, rsp.getEvents().get(0).getEventId());
@@ -814,10 +1023,11 @@ public class TestDbNotificationListener {
IMetaStoreClient.NotificationFilter filter = new IMetaStoreClient.NotificationFilter() {
@Override
public boolean accept(NotificationEvent event) {
- return event.getEventType().equals(HCatConstants.HCAT_CREATE_DATABASE_EVENT);
+ return event.getEventType().equals(EventType.CREATE_DATABASE.toString());
}
};
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 1, filter);
assertEquals(1, rsp.getEventsSize());
assertEquals(firstEventId + 1, rsp.getEvents().get(0).getEventId());
@@ -825,161 +1035,223 @@ public class TestDbNotificationListener {
@Test
public void sqlInsertTable() throws Exception {
-
- driver.run("create table sit (c int)");
- driver.run("insert into table sit values (1)");
- driver.run("alter table sit add columns (c2 int)");
- driver.run("drop table sit");
-
+ String defaultDbName = "default";
+ String tblName = "sqlins";
+ // Event 1
+ driver.run("create table " + tblName + " (c int)");
+ // Event 2 (alter: marker stats event), 3 (insert), 4 (alter: stats update event)
+ driver.run("insert into table " + tblName + " values (1)");
+ // Event 5
+ driver.run("alter table " + tblName + " add columns (c2 int)");
+ // Event 6
+ driver.run("drop table " + tblName);
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
- // For reasons not clear to me there's an alter after the create table and one after the
- // insert. I think the one after the insert is a stats calculation.
assertEquals(6, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
- assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+ assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+
event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- // Make sure the files are listed in the insert
- assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // Parse the message field
+ verifyInsertJSON(event, defaultDbName, tblName, true);
+
event = rsp.getEvents().get(4);
assertEquals(firstEventId + 5, event.getEventId());
- assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
+ assertEquals(EventType.ALTER_TABLE.toString(), event.getEventType());
+
event = rsp.getEvents().get(5);
assertEquals(firstEventId + 6, event.getEventId());
- assertEquals(HCatConstants.HCAT_DROP_TABLE_EVENT, event.getEventType());
+ assertEquals(EventType.DROP_TABLE.toString(), event.getEventType());
}
@Test
public void sqlCTAS() throws Exception {
+ String sourceTblName = "sqlctasins1";
+ String targetTblName = "sqlctasins2";
+ // Event 1
+ driver.run("create table " + sourceTblName + " (c int)");
+ // Event 2 (alter: marker stats event), 3 (insert), 4 (alter: stats update event)
+ driver.run("insert into table " + sourceTblName + " values (1)");
+ // Event 5, 6 (alter: stats update event)
+ driver.run("create table " + targetTblName + " as select c from " + sourceTblName);
- driver.run("create table ctas_source (c int)");
- driver.run("insert into table ctas_source values (1)");
- driver.run("create table ctas_target as select c from ctas_source");
-
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-
assertEquals(6, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
- assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+ assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
+
event = rsp.getEvents().get(2);
assertEquals(firstEventId + 3, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- // Make sure the files are listed in the insert
- assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // Parse the message field
+ verifyInsertJSON(event, null, sourceTblName, true);
+
event = rsp.getEvents().get(4);
assertEquals(firstEventId + 5, event.getEventId());
- assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
+ assertEquals(EventType.CREATE_TABLE.toString(), event.getEventType());
}
@Test
public void sqlTempTable() throws Exception {
+ String tempTblName = "sqltemptbl";
+ driver.run("create temporary table " + tempTblName + " (c int)");
+ driver.run("insert into table " + tempTblName + " values (1)");
- LOG.info("XXX Starting temp table");
- driver.run("create temporary table tmp1 (c int)");
- driver.run("insert into table tmp1 values (1)");
-
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-
assertEquals(0, rsp.getEventsSize());
}
@Test
public void sqlDb() throws Exception {
+ String dbName = "sqldb";
+ // Event 1
+ driver.run("create database " + dbName);
+ // Event 2
+ driver.run("drop database " + dbName);
- driver.run("create database sd");
- driver.run("drop database sd");
-
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
- assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT, event.getEventType());
+ assertEquals(EventType.CREATE_DATABASE.toString(), event.getEventType());
event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
- assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, event.getEventType());
+ assertEquals(EventType.DROP_DATABASE.toString(), event.getEventType());
}
@Test
public void sqlInsertPartition() throws Exception {
-
- driver.run("create table sip (c int) partitioned by (ds string)");
- driver.run("insert into table sip partition (ds = 'today') values (1)");
- driver.run("insert into table sip partition (ds = 'today') values (2)");
- driver.run("insert into table sip partition (ds) values (3, 'today')");
- driver.run("alter table sip add partition (ds = 'yesterday')");
- driver.run("insert into table sip partition (ds = 'yesterday') values (2)");
-
- driver.run("insert into table sip partition (ds) values (3, 'yesterday')");
- driver.run("insert into table sip partition (ds) values (3, 'tomorrow')");
- driver.run("alter table sip drop partition (ds = 'tomorrow')");
-
- driver.run("insert into table sip partition (ds) values (42, 'todaytwo')");
- driver.run("insert overwrite table sip partition(ds='todaytwo') select c from sip where 'ds'='today'");
-
+ String tblName = "sqlinsptn";
+ // Event 1
+ driver.run("create table " + tblName + " (c int) partitioned by (ds string)");
+ // Event 2, 3, 4
+ driver.run("insert into table " + tblName + " partition (ds = 'today') values (1)");
+ // Event 5, 6, 7
+ driver.run("insert into table " + tblName + " partition (ds = 'today') values (2)");
+ // Event 8, 9, 10
+ driver.run("insert into table " + tblName + " partition (ds) values (3, 'today')");
+ // Event 9, 10
+ driver.run("alter table " + tblName + " add partition (ds = 'yesterday')");
+ // Event 10, 11, 12
+ driver.run("insert into table " + tblName + " partition (ds = 'yesterday') values (2)");
+ // Event 12, 13, 14
+ driver.run("insert into table " + tblName + " partition (ds) values (3, 'yesterday')");
+ // Event 15, 16, 17
+ driver.run("insert into table " + tblName + " partition (ds) values (3, 'tomorrow')");
+ // Event 18
+ driver.run("alter table " + tblName + " drop partition (ds = 'tomorrow')");
+ // Event 19, 20, 21
+ driver.run("insert into table " + tblName + " partition (ds) values (42, 'todaytwo')");
+ // Event 22, 23, 24
+ driver.run("insert overwrite table " + tblName + " partition(ds='todaytwo') select c from "
+ + tblName + " where 'ds'='today'");
+
+ // Get notifications from metastore
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
-
- for (NotificationEvent ne : rsp.getEvents()) LOG.debug("EVENT: " + ne.getMessage());
- // For reasons not clear to me there's one or more alter partitions after add partition and
- // insert.
assertEquals(24, rsp.getEventsSize());
NotificationEvent event = rsp.getEvents().get(1);
assertEquals(firstEventId + 2, event.getEventId());
- assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
event = rsp.getEvents().get(3);
assertEquals(firstEventId + 4, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- // Make sure the files are listed in the insert
- assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // Parse the message field
+ verifyInsertJSON(event, null, tblName, true);
+
event = rsp.getEvents().get(6);
assertEquals(firstEventId + 7, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // Parse the message field
+ verifyInsertJSON(event, null, tblName, true);
+
event = rsp.getEvents().get(9);
assertEquals(firstEventId + 10, event.getEventId());
- assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
event = rsp.getEvents().get(10);
assertEquals(firstEventId + 11, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // Parse the message field
+ verifyInsertJSON(event, null, tblName, true);
+
event = rsp.getEvents().get(13);
assertEquals(firstEventId + 14, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- assertTrue(event.getMessage().matches(".*\"files\":\\[\"pfile.*"));
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // Parse the message field
+ verifyInsertJSON(event, null, tblName, true);
+
event = rsp.getEvents().get(16);
assertEquals(firstEventId + 17, event.getEventId());
- assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
event = rsp.getEvents().get(18);
assertEquals(firstEventId + 19, event.getEventId());
- assertEquals(HCatConstants.HCAT_DROP_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.DROP_PARTITION.toString(), event.getEventType());
event = rsp.getEvents().get(19);
assertEquals(firstEventId + 20, event.getEventId());
- assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ADD_PARTITION.toString(), event.getEventType());
+
event = rsp.getEvents().get(20);
assertEquals(firstEventId + 21, event.getEventId());
- assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
event = rsp.getEvents().get(21);
assertEquals(firstEventId + 22, event.getEventId());
- assertEquals(HCatConstants.HCAT_INSERT_EVENT, event.getEventType());
- assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*")); // replace-overwrite introduces no new files
+ assertEquals(EventType.INSERT.toString(), event.getEventType());
+ // replace-overwrite introduces no new files
+ assertTrue(event.getMessage().matches(".*\"files\":\\[\\].*"));
+
event = rsp.getEvents().get(22);
assertEquals(firstEventId + 23, event.getEventId());
- assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
+
event = rsp.getEvents().get(23);
assertEquals(firstEventId + 24, event.getEventId());
- assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
+ assertEquals(EventType.ALTER_PARTITION.toString(), event.getEventType());
assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
- }
+ }
+
+ private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName,
+ boolean verifyChecksums) throws Exception {
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ System.out.println("JSONInsertMessage: " + jsonTree.toString());
+ assertEquals(EventType.INSERT.toString(), jsonTree.get("eventType").asText());
+ if (dbName != null) {
+ assertEquals(dbName, jsonTree.get("db").asText());
+ }
+ if (tblName != null) {
+ assertEquals(tblName, jsonTree.get("table").asText());
+ }
+ // Should have list of files
+ List<String> files =
+ JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("files"), new ArrayList<String>());
+ assertTrue(files.size() > 0);
+ if (verifyChecksums) {
+ // Should have list of file checksums
+ List<String> fileChecksums =
+ JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("fileChecksums"),
+ new ArrayList<String>());
+ assertTrue(fileChecksums.size() > 0);
+
+ }
+ }
@Test
public void cleanupNotifs() throws Exception {
- Database db = new Database("cleanup1","no description","file:/tmp", emptyParameters);
+ Database db = new Database("cleanup1", "no description", "file:/tmp", emptyParameters);
msClient.createDatabase(db);
msClient.dropDatabase("cleanup1");
@@ -988,7 +1260,8 @@ public class TestDbNotificationListener {
assertEquals(2, rsp.getEventsSize());
// sleep for expiry time, and then fetch again
- Thread.sleep(EVENTS_TTL * 2 * 1000); // sleep twice the TTL interval - things should have been cleaned by then.
+ // sleep twice the TTL interval - things should have been cleaned by then.
+ Thread.sleep(EVENTS_TTL * 2 * 1000);
LOG.info("Pulling events again after cleanup");
NotificationEventResponse rsp2 = msClient.getNextNotification(firstEventId, 0, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/bbd99ed6/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index 6f77156..79592ea 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -811,7 +811,9 @@ struct CurrentNotificationEventId {
}
struct InsertEventRequestData {
- 1: required list<string> filesAdded
+ 1: required list<string> filesAdded,
+ // Checksum of files (UTF8 encoded string) added during this insert event (at the time they were added)
+ 2: optional list<binary> filesAddedChecksum,
}
union FireEventRequestData {