You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/11/23 22:57:31 UTC
[2/2] hive git commit: HIVE-15180: Extend JSONMessageFactory to store
additional information about metadata objects on different table events
(Sushanth Sowmyan, Vaibhav Gumashta reviewed by Thejas Nair)
HIVE-15180: Extend JSONMessageFactory to store additional information about metadata objects on different table events (Sushanth Sowmyan, Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3dd28fbb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3dd28fbb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3dd28fbb
Branch: refs/heads/master
Commit: 3dd28fbbb68f73562b92d49449d3dde0afd79ef1
Parents: bfad863
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Wed Nov 23 14:57:04 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Wed Nov 23 14:57:04 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../listener/DbNotificationListener.java | 134 +++++-----
.../api/TestHCatClientNotification.java | 59 +++--
.../listener/TestDbNotificationListener.java | 153 ++++++-----
.../messaging/AddPartitionMessage.java | 51 ++++
.../metastore/messaging/AlterIndexMessage.java | 27 ++
.../messaging/AlterPartitionMessage.java | 40 +++
.../metastore/messaging/AlterTableMessage.java | 34 +++
.../messaging/CreateDatabaseMessage.java | 28 ++
.../messaging/CreateFunctionMessage.java | 27 ++
.../metastore/messaging/CreateIndexMessage.java | 27 ++
.../metastore/messaging/CreateTableMessage.java | 40 +++
.../messaging/DropDatabaseMessage.java | 27 ++
.../messaging/DropFunctionMessage.java | 27 ++
.../metastore/messaging/DropIndexMessage.java | 27 ++
.../messaging/DropPartitionMessage.java | 42 +++
.../metastore/messaging/DropTableMessage.java | 40 +++
.../hive/metastore/messaging/EventMessage.java | 106 ++++++++
.../hive/metastore/messaging/EventUtils.java | 222 ++++++++++++++++
.../hive/metastore/messaging/InsertMessage.java | 59 +++++
.../messaging/MessageDeserializer.java | 145 +++++++++++
.../metastore/messaging/MessageFactory.java | 241 +++++++++++++++++
.../messaging/json/JSONAddPartitionMessage.java | 139 ++++++++++
.../messaging/json/JSONAlterIndexMessage.java | 88 +++++++
.../json/JSONAlterPartitionMessage.java | 124 +++++++++
.../messaging/json/JSONAlterTableMessage.java | 100 +++++++
.../json/JSONCreateDatabaseMessage.java | 71 +++++
.../json/JSONCreateFunctionMessage.java | 81 ++++++
.../messaging/json/JSONCreateIndexMessage.java | 82 ++++++
.../messaging/json/JSONCreateTableMessage.java | 100 +++++++
.../messaging/json/JSONDropDatabaseMessage.java | 71 +++++
.../messaging/json/JSONDropFunctionMessage.java | 81 ++++++
.../messaging/json/JSONDropIndexMessage.java | 82 ++++++
.../json/JSONDropPartitionMessage.java | 97 +++++++
.../messaging/json/JSONDropTableMessage.java | 86 +++++++
.../messaging/json/JSONInsertMessage.java | 98 +++++++
.../messaging/json/JSONMessageDeserializer.java | 189 ++++++++++++++
.../messaging/json/JSONMessageFactory.java | 258 +++++++++++++++++++
38 files changed, 3147 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 9e4c590..9a5d604 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -250,6 +250,7 @@ public class HiveConf extends Configuration {
HiveConf.ConfVars.METASTORE_TRANSACTIONAL_EVENT_LISTENERS,
HiveConf.ConfVars.METASTORE_EVENT_CLEAN_FREQ,
HiveConf.ConfVars.METASTORE_EVENT_EXPIRY_DURATION,
+ HiveConf.ConfVars.METASTORE_EVENT_MESSAGE_FACTORY,
HiveConf.ConfVars.METASTORE_FILTER_HOOK,
HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL,
HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS,
@@ -794,6 +795,9 @@ public class HiveConf extends Configuration {
METASTORE_EVENT_EXPIRY_DURATION("hive.metastore.event.expiry.duration", "0s",
new TimeValidator(TimeUnit.SECONDS),
"Duration after which events expire from events table"),
+ METASTORE_EVENT_MESSAGE_FACTORY("hive.metastore.event.message.factory",
+ "org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory",
+ "Factory class for making encoding and decoding messages in the events generated."),
METASTORE_EXECUTE_SET_UGI("hive.metastore.execute.setugi", true,
"In unsecure mode, setting this property to true will cause the metastore to execute DFS operations using \n" +
"the client's reported user and group permissions. Note that this property must be set on \n" +
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index ea7520d..494d01f 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -17,41 +17,42 @@
*/
package org.apache.hive.hcatalog.listener;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
-import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
-import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
-import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
-import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
import org.apache.hadoop.hive.metastore.RawStore;
import org.apache.hadoop.hive.metastore.RawStoreProxy;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterIndexEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
import org.apache.hadoop.hive.metastore.events.ConfigChangeEvent;
import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateFunctionEvent;
import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropFunctionEvent;
+import org.apache.hadoop.hive.metastore.events.DropIndexEvent;
import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.events.InsertEvent;
import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
-import org.apache.hive.hcatalog.common.HCatConstants;
-import org.apache.hive.hcatalog.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.TimeUnit;
-
/**
* An implementation of {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} that
* stores events in the database.
@@ -121,10 +122,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param tableEvent table event.
* @throws MetaException
*/
- public void onCreateTable (CreateTableEvent tableEvent) throws MetaException {
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
Table t = tableEvent.getTable();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.CREATE_TABLE.toString(), msgFactory
+ .buildCreateTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
enqueue(event);
@@ -134,10 +136,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param tableEvent table event.
* @throws MetaException
*/
- public void onDropTable (DropTableEvent tableEvent) throws MetaException {
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
Table t = tableEvent.getTable();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.DROP_TABLE.toString(), msgFactory
+ .buildDropTableMessage(t).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
enqueue(event);
@@ -147,12 +150,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param tableEvent alter table event
* @throws MetaException
*/
- public void onAlterTable (AlterTableEvent tableEvent) throws MetaException {
+ public void onAlterTable(AlterTableEvent tableEvent) throws MetaException {
Table before = tableEvent.getOldTable();
Table after = tableEvent.getNewTable();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_ALTER_TABLE_EVENT,
- msgFactory.buildAlterTableMessage(before, after).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.ALTER_TABLE.toString(), msgFactory
+ .buildAlterTableMessage(before, after).toString());
event.setDbName(after.getDbName());
event.setTableName(after.getTableName());
enqueue(event);
@@ -162,12 +165,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param partitionEvent partition event
* @throws MetaException
*/
- public void onAddPartition (AddPartitionEvent partitionEvent)
- throws MetaException {
+ public void onAddPartition(AddPartitionEvent partitionEvent) throws MetaException {
Table t = partitionEvent.getTable();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_ADD_PARTITION_EVENT,
- msgFactory.buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
+ String msg = msgFactory
+ .buildAddPartitionMessage(t, partitionEvent.getPartitionIterator()).toString();
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.ADD_PARTITION.toString(), msg);
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
enqueue(event);
@@ -177,11 +180,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param partitionEvent partition event
* @throws MetaException
*/
- public void onDropPartition (DropPartitionEvent partitionEvent) throws MetaException {
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
Table t = partitionEvent.getTable();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_DROP_PARTITION_EVENT,
- msgFactory.buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.DROP_PARTITION.toString(), msgFactory
+ .buildDropPartitionMessage(t, partitionEvent.getPartitionIterator()).toString());
event.setDbName(t.getDbName());
event.setTableName(t.getTableName());
enqueue(event);
@@ -191,12 +194,12 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param partitionEvent partition event
* @throws MetaException
*/
- public void onAlterPartition (AlterPartitionEvent partitionEvent) throws MetaException {
+ public void onAlterPartition(AlterPartitionEvent partitionEvent) throws MetaException {
Partition before = partitionEvent.getOldPartition();
Partition after = partitionEvent.getNewPartition();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_ALTER_PARTITION_EVENT,
- msgFactory.buildAlterPartitionMessage(partitionEvent.getTable(),before, after).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.ALTER_PARTITION.toString(), msgFactory
+ .buildAlterPartitionMessage(partitionEvent.getTable(), before, after).toString());
event.setDbName(before.getDbName());
event.setTableName(before.getTableName());
enqueue(event);
@@ -206,11 +209,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param dbEvent database event
* @throws MetaException
*/
- public void onCreateDatabase (CreateDatabaseEvent dbEvent) throws MetaException {
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent) throws MetaException {
Database db = dbEvent.getDatabase();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_CREATE_DATABASE_EVENT,
- msgFactory.buildCreateDatabaseMessage(db).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.CREATE_DATABASE.toString(), msgFactory
+ .buildCreateDatabaseMessage(db).toString());
event.setDbName(db.getName());
enqueue(event);
}
@@ -219,11 +222,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param dbEvent database event
* @throws MetaException
*/
- public void onDropDatabase (DropDatabaseEvent dbEvent) throws MetaException {
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
Database db = dbEvent.getDatabase();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_DROP_DATABASE_EVENT,
- msgFactory.buildDropDatabaseMessage(db).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.DROP_DATABASE.toString(), msgFactory
+ .buildDropDatabaseMessage(db).toString());
event.setDbName(db.getName());
enqueue(event);
}
@@ -232,11 +235,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param fnEvent function event
* @throws MetaException
*/
- public void onCreateFunction (CreateFunctionEvent fnEvent) throws MetaException {
+ public void onCreateFunction(CreateFunctionEvent fnEvent) throws MetaException {
Function fn = fnEvent.getFunction();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_CREATE_FUNCTION_EVENT,
- msgFactory.buildCreateFunctionMessage(fn).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.CREATE_FUNCTION.toString(), msgFactory
+ .buildCreateFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
enqueue(event);
}
@@ -245,11 +248,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param fnEvent function event
* @throws MetaException
*/
- public void onDropFunction (DropFunctionEvent fnEvent) throws MetaException {
+ public void onDropFunction(DropFunctionEvent fnEvent) throws MetaException {
Function fn = fnEvent.getFunction();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_DROP_FUNCTION_EVENT,
- msgFactory.buildDropFunctionMessage(fn).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.DROP_FUNCTION.toString(), msgFactory
+ .buildDropFunctionMessage(fn).toString());
event.setDbName(fn.getDbName());
enqueue(event);
}
@@ -258,11 +261,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param indexEvent index event
* @throws MetaException
*/
- public void onAddIndex (AddIndexEvent indexEvent) throws MetaException {
+ public void onAddIndex(AddIndexEvent indexEvent) throws MetaException {
Index index = indexEvent.getIndex();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_CREATE_INDEX_EVENT,
- msgFactory.buildCreateIndexMessage(index).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.CREATE_INDEX.toString(), msgFactory
+ .buildCreateIndexMessage(index).toString());
event.setDbName(index.getDbName());
enqueue(event);
}
@@ -271,11 +274,11 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param indexEvent index event
* @throws MetaException
*/
- public void onDropIndex (DropIndexEvent indexEvent) throws MetaException {
+ public void onDropIndex(DropIndexEvent indexEvent) throws MetaException {
Index index = indexEvent.getIndex();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_DROP_INDEX_EVENT,
- msgFactory.buildDropIndexMessage(index).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.DROP_INDEX.toString(), msgFactory
+ .buildDropIndexMessage(index).toString());
event.setDbName(index.getDbName());
enqueue(event);
}
@@ -284,21 +287,22 @@ public class DbNotificationListener extends MetaStoreEventListener {
* @param indexEvent index event
* @throws MetaException
*/
- public void onAlterIndex (AlterIndexEvent indexEvent) throws MetaException {
+ public void onAlterIndex(AlterIndexEvent indexEvent) throws MetaException {
Index before = indexEvent.getOldIndex();
Index after = indexEvent.getNewIndex();
- NotificationEvent event = new NotificationEvent(0, now(),
- HCatConstants.HCAT_ALTER_INDEX_EVENT,
- msgFactory.buildAlterIndexMessage(before, after).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.ALTER_INDEX.toString(), msgFactory
+ .buildAlterIndexMessage(before, after).toString());
event.setDbName(before.getDbName());
enqueue(event);
}
@Override
public void onInsert(InsertEvent insertEvent) throws MetaException {
- NotificationEvent event = new NotificationEvent(0, now(), HCatConstants.HCAT_INSERT_EVENT,
- msgFactory.buildInsertMessage(insertEvent.getDb(), insertEvent.getTable(),
- insertEvent.getPartitionKeyValues(), insertEvent.getFiles()).toString());
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.INSERT.toString(), msgFactory.buildInsertMessage(
+ insertEvent.getDb(), insertEvent.getTable(), insertEvent.getPartitionKeyValues(),
+ insertEvent.getFiles()).toString());
event.setDbName(insertEvent.getDb());
event.setTableName(insertEvent.getTable());
enqueue(event);
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
index a661962..775c1d6 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/api/TestHCatClientNotification.java
@@ -18,28 +18,31 @@
*/
package org.apache.hive.hcatalog.api;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
-import junit.framework.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hive.hcatalog.common.HCatConstants;
import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
import org.apache.hive.hcatalog.listener.DbNotificationListener;
-import org.apache.hive.hcatalog.messaging.HCatEventMessage;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -125,8 +128,14 @@ public class TestHCatClientNotification {
assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
assertEquals(dbName, event.getDbName());
assertEquals("hcatcreatetable", event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"hcatcreatetable\",\"timestamp\":[0-9]+}"));
+
+ // Parse the message field
+ ObjectNode jsonTree = getJsonTree(event);
+ assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText());
+ assertEquals("default", jsonTree.get("db").asText());
+ assertEquals("hcatcreatetable", jsonTree.get("table").asText());
+ Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
+ assertEquals(table.toHiveTable(), tableObj);
}
// TODO - Currently no way to test alter table, as this interface doesn't support alter table
@@ -166,11 +175,8 @@ public class TestHCatClientNotification {
String partName = "testpart";
Map<String, String> partSpec = new HashMap<String, String>(1);
partSpec.put(partColName, partName);
- hCatClient.addPartition(
- HCatAddPartitionDesc.create(
- new HCatPartition(table, partSpec, null)
- ).build()
- );
+ HCatPartition part = new HCatPartition(table, partSpec, null);
+ hCatClient.addPartition(HCatAddPartitionDesc.create(part).build());
List<HCatNotificationEvent> events = hCatClient.getNextNotification(firstEventId, 0, null);
assertEquals(2, events.size());
@@ -181,9 +187,20 @@ public class TestHCatClientNotification {
assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
assertEquals("default", event.getDbName());
assertEquals(tableName, event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
- "\"hcataddparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"pc\":\"testpart\"}]}"));
+
+ // Parse the message field
+ ObjectNode jsonTree = getJsonTree(event);
+ assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText());
+ assertEquals("default", jsonTree.get("db").asText());
+ assertEquals("hcataddparttable", jsonTree.get("table").asText());
+ List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
+ HCatPartition hcatPart = new HCatPartition(table, partitionObjList.get(0));
+ assertEquals(part.getDatabaseName(), hcatPart.getDatabaseName());
+ assertEquals(part.getTableName(), hcatPart.getTableName());
+ assertEquals(part.getValues(), hcatPart.getValues());
+ assertEquals(part.getColumns(), hcatPart.getColumns());
+ assertEquals(part.getPartColumns(), hcatPart.getPartColumns());
+ assertEquals(part.getLocation(), hcatPart.getLocation());
}
// TODO - currently no way to test alter partition, as HCatClient doesn't support it.
@@ -265,4 +282,10 @@ public class TestHCatClientNotification {
assertEquals(1, events.size());
assertEquals(firstEventId + 1, events.get(0).getEventId());
}
+
+ private ObjectNode getJsonTree(HCatNotificationEvent event) throws Exception {
+ JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(jsonParser, ObjectNode.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 4f97cf4..690616d 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -18,25 +18,19 @@
*/
package org.apache.hive.hcatalog.listener;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.FunctionType;
-import org.apache.hadoop.hive.metastore.api.Index;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.PrincipalType;
-import org.apache.hadoop.hive.metastore.api.ResourceType;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.htrace.fasterxml.jackson.core.JsonFactory;
-import org.apache.htrace.fasterxml.jackson.core.JsonParser;
-import org.apache.htrace.fasterxml.jackson.databind.JsonNode;
-import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.htrace.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.thrift.TDeserializer;
-import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -45,29 +39,37 @@ import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FireEventRequest;
import org.apache.hadoop.hive.metastore.api.FireEventRequestData;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.FunctionType;
+import org.apache.hadoop.hive.metastore.api.Index;
import org.apache.hadoop.hive.metastore.api.InsertEventRequestData;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.ResourceType;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.hcatalog.common.HCatConstants;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.protocol.TJSONProtocol;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParser;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ObjectNode;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class TestDbNotificationListener {
private static final Logger LOG = LoggerFactory.getLogger(TestDbNotificationListener.class.getName());
private static final int EVENTS_TTL = 30;
@@ -183,26 +185,34 @@ public class TestDbNotificationListener {
List<FieldSchema> cols = new ArrayList<FieldSchema>();
cols.add(new FieldSchema("col1", "int", "nocomment"));
SerDeInfo serde = new SerDeInfo("serde", "seriallib", null);
- StorageDescriptor sd = new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0,
- serde, null, null, emptyParameters);
- Table table = new Table("mytable", "default", "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ StorageDescriptor sd =
+ new StorageDescriptor(cols, "file:/tmp", "input", "output", false, 0, serde, null, null,
+ emptyParameters);
+ Table table =
+ new Table("mytable", "default", "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
msClient.createTable(table);
-
+ // Get the event
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
assertEquals(1, rsp.getEventsSize());
-
NotificationEvent event = rsp.getEvents().get(0);
assertEquals(firstEventId + 1, event.getEventId());
assertTrue(event.getEventTime() >= startTime);
assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, event.getEventType());
assertEquals("default", event.getDbName());
assertEquals("mytable", event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"CREATE_TABLE\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"mytable\",\"timestamp\":[0-9]+}"));
- table = new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null,
- emptyParameters, null, null, null);
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(HCatConstants.HCAT_CREATE_TABLE_EVENT, jsonTree.get("eventType").asText());
+ assertEquals("default", jsonTree.get("db").asText());
+ assertEquals("mytable", jsonTree.get("table").asText());
+ Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
+ assertEquals(table, tableObj);
+
+ table =
+ new Table("mytable2", "default", "me", startTime, startTime, 0, sd, null, emptyParameters,
+ null, null, null);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
msClient.createTable(table);
@@ -223,11 +233,12 @@ public class TestDbNotificationListener {
serde, null, null, emptyParameters);
Table table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+ // Event 1
msClient.createTable(table);
-
cols.add(new FieldSchema("col2", "int", ""));
table = new Table("alttable", "default", "me", startTime, startTime, 0, sd,
new ArrayList<FieldSchema>(), emptyParameters, null, null, null);
+ // Event 2
msClient.alter_table("default", "alttable", table);
NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
@@ -239,9 +250,14 @@ public class TestDbNotificationListener {
assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, event.getEventType());
assertEquals("default", event.getDbName());
assertEquals("alttable", event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"ALTER_TABLE\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alttable\"," +
- "\"timestamp\":[0-9]+}"));
+
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(HCatConstants.HCAT_ALTER_TABLE_EVENT, jsonTree.get("eventType").asText());
+ assertEquals("default", jsonTree.get("db").asText());
+ assertEquals("alttable", jsonTree.get("table").asText());
+ Table tableObj = JSONMessageFactory.getTableObj(jsonTree);
+ assertEquals(table, tableObj);
DummyRawStoreFailEvent.setEventSucceed(false);
try {
@@ -319,9 +335,14 @@ public class TestDbNotificationListener {
assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, event.getEventType());
assertEquals("default", event.getDbName());
assertEquals("addparttable", event.getTableName());
- assertTrue(event.getMessage().matches("\\{\"eventType\":\"ADD_PARTITION\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":" +
- "\"addparttable\",\"timestamp\":[0-9]+,\"partitions\":\\[\\{\"ds\":\"today\"}]}"));
+
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(HCatConstants.HCAT_ADD_PARTITION_EVENT, jsonTree.get("eventType").asText());
+ assertEquals("default", jsonTree.get("db").asText());
+ assertEquals("addparttable", jsonTree.get("table").asText());
+ List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
+ assertEquals(partition, partitionObjList.get(0));
partition = new Partition(Arrays.asList("tomorrow"), "default", "tableDoesNotExist",
startTime, startTime, sd, emptyParameters);
@@ -365,10 +386,14 @@ public class TestDbNotificationListener {
assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, event.getEventType());
assertEquals("default", event.getDbName());
assertEquals("alterparttable", event.getTableName());
- assertTrue(event.getMessage(),
- event.getMessage().matches("\\{\"eventType\":\"ALTER_PARTITION\",\"server\":\"\"," +
- "\"servicePrincipal\":\"\",\"db\":\"default\",\"table\":\"alterparttable\"," +
- "\"timestamp\":[0-9]+,\"keyValues\":\\{\"ds\":\"today\"}}"));
+
+ // Parse the message field
+ ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
+ assertEquals(HCatConstants.HCAT_ALTER_PARTITION_EVENT, jsonTree.get("eventType").asText());
+ assertEquals("default", jsonTree.get("db").asText());
+ assertEquals("alterparttable", jsonTree.get("table").asText());
+ List<Partition> partitionObjList = JSONMessageFactory.getPartitionObjList(jsonTree);
+ assertEquals(newPart, partitionObjList.get(0));
DummyRawStoreFailEvent.setEventSucceed(false);
try {
@@ -445,7 +470,7 @@ public class TestDbNotificationListener {
assertTrue(event.getEventTime() >= startTime);
assertEquals(HCatConstants.HCAT_CREATE_FUNCTION_EVENT, event.getEventType());
assertEquals(dbName, event.getDbName());
- Function funcObj = getFunctionObj(getJsonTree(event));
+ Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
assertEquals(dbName, funcObj.getDbName());
assertEquals(funcName, funcObj.getFunctionName());
assertEquals(funcClass, funcObj.getClassName());
@@ -488,7 +513,7 @@ public class TestDbNotificationListener {
assertTrue(event.getEventTime() >= startTime);
assertEquals(HCatConstants.HCAT_DROP_FUNCTION_EVENT, event.getEventType());
assertEquals(dbName, event.getDbName());
- Function funcObj = getFunctionObj(getJsonTree(event));
+ Function funcObj = JSONMessageFactory.getFunctionObj(JSONMessageFactory.getJsonTree(event));
assertEquals(dbName, funcObj.getDbName());
assertEquals(funcName, funcObj.getFunctionName());
assertEquals(funcClass, funcObj.getClassName());
@@ -542,7 +567,7 @@ public class TestDbNotificationListener {
assertTrue(event.getEventTime() >= startTime);
assertEquals(HCatConstants.HCAT_CREATE_INDEX_EVENT, event.getEventType());
assertEquals(dbName, event.getDbName());
- Index indexObj = getIndexObj(getJsonTree(event));
+ Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
assertEquals(dbName, indexObj.getDbName());
assertEquals(indexName, indexObj.getIndexName());
assertEquals(tableName, indexObj.getOrigTableName());
@@ -593,7 +618,7 @@ public class TestDbNotificationListener {
assertTrue(event.getEventTime() >= startTime);
assertEquals(HCatConstants.HCAT_DROP_INDEX_EVENT, event.getEventType());
assertEquals(dbName, event.getDbName());
- Index indexObj = getIndexObj(getJsonTree(event));
+ Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event));
assertEquals(dbName, indexObj.getDbName());
assertEquals(indexName.toLowerCase(), indexObj.getIndexName());
assertEquals(tableName.toLowerCase(), indexObj.getOrigTableName());
@@ -647,7 +672,7 @@ public class TestDbNotificationListener {
assertTrue(event.getEventTime() >= startTime);
assertEquals(HCatConstants.HCAT_ALTER_INDEX_EVENT, event.getEventType());
assertEquals(dbName, event.getDbName());
- Index indexObj = getIndexObj(getJsonTree(event), "afterIndexObjJson");
+ Index indexObj = JSONMessageFactory.getIndexObj(JSONMessageFactory.getJsonTree(event), "afterIndexObjJson");
assertEquals(dbName, indexObj.getDbName());
assertEquals(indexName, indexObj.getIndexName());
assertEquals(tableName, indexObj.getOrigTableName());
@@ -971,30 +996,4 @@ public class TestDbNotificationListener {
LOG.info("second trigger done");
assertEquals(0, rsp2.getEventsSize());
}
-
- private ObjectNode getJsonTree(NotificationEvent event) throws Exception {
- JsonParser jsonParser = (new JsonFactory()).createJsonParser(event.getMessage());
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(jsonParser, ObjectNode.class);
- }
-
- private Function getFunctionObj(JsonNode jsonTree) throws Exception {
- TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
- Function funcObj = new Function();
- String tableJson = jsonTree.get("functionObjJson").asText();
- deSerializer.deserialize(funcObj, tableJson, "UTF-8");
- return funcObj;
- }
-
- private Index getIndexObj(JsonNode jsonTree) throws Exception {
- return getIndexObj(jsonTree, "indexObjJson");
- }
-
- private Index getIndexObj(JsonNode jsonTree, String indexObjKey) throws Exception {
- TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory());
- Index indexObj = new Index();
- String tableJson = jsonTree.get(indexObjKey).asText();
- deSerializer.deserialize(indexObj, tableJson, "UTF-8");
- return indexObj;
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java
new file mode 100644
index 0000000..26898f2
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AddPartitionMessage.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AddPartitionMessage extends EventMessage {
+
+ protected AddPartitionMessage() {
+ super(EventType.ADD_PARTITION);
+ }
+
+ /**
+ * Getter for name of table (where partitions are added).
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ /**
+ * Getter for list of partitions added.
+ * @return List of maps, where each map identifies values for each partition-key, for every added partition.
+ */
+ public abstract List<Map<String, String>> getPartitions ();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ if (getPartitions() == null)
+ throw new IllegalStateException("Partition-list unset.");
+ return super.checkValid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java
new file mode 100644
index 0000000..0fc7f9e
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterIndexMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class AlterIndexMessage extends EventMessage {
+
+ protected AlterIndexMessage() {
+ super(EventType.ALTER_INDEX);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
new file mode 100644
index 0000000..d89dba1
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterPartitionMessage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.Map;
+
+public abstract class AlterPartitionMessage extends EventMessage {
+
+ protected AlterPartitionMessage() {
+ super(EventType.ALTER_PARTITION);
+ }
+
+ public abstract String getTable();
+
+ public abstract Map<String,String> getKeyValues();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null) throw new IllegalStateException("Table name unset.");
+ if (getKeyValues() == null) throw new IllegalStateException("Partition values unset");
+ return super.checkValid();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
new file mode 100644
index 0000000..99e678a
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/AlterTableMessage.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class AlterTableMessage extends EventMessage {
+
+ protected AlterTableMessage() {
+ super(EventType.ALTER_TABLE);
+ }
+
+ public abstract String getTable();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null) throw new IllegalStateException("Table name unset.");
+ return super.checkValid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java
new file mode 100644
index 0000000..7614298
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateDatabaseMessage.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateDatabaseMessage extends EventMessage {
+
+ protected CreateDatabaseMessage() {
+ super(EventType.CREATE_DATABASE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java
new file mode 100644
index 0000000..867e8ec
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateFunctionMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateFunctionMessage extends EventMessage {
+
+ protected CreateFunctionMessage() {
+ super(EventType.CREATE_FUNCTION);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java
new file mode 100644
index 0000000..81676aa
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateIndexMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateIndexMessage extends EventMessage {
+
+ protected CreateIndexMessage() {
+ super(EventType.CREATE_INDEX);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java
new file mode 100644
index 0000000..c88c59c
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/CreateTableMessage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class CreateTableMessage extends EventMessage {
+
+ protected CreateTableMessage() {
+ super(EventType.CREATE_TABLE);
+ }
+
+ /**
+ * Getter for the name of table created in HCatalog.
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ return super.checkValid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java
new file mode 100644
index 0000000..fa6da38
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropDatabaseMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropDatabaseMessage extends EventMessage {
+
+ protected DropDatabaseMessage() {
+ super(EventType.DROP_DATABASE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java
new file mode 100644
index 0000000..82cdc44
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropFunctionMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropFunctionMessage extends EventMessage {
+
+ protected DropFunctionMessage() {
+ super(EventType.DROP_FUNCTION);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java
new file mode 100644
index 0000000..ce7b760
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropIndexMessage.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropIndexMessage extends EventMessage {
+
+ protected DropIndexMessage() {
+ super(EventType.DROP_INDEX);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java
new file mode 100644
index 0000000..26aecb3
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropPartitionMessage.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class DropPartitionMessage extends EventMessage {
+
+ protected DropPartitionMessage() {
+ super(EventType.DROP_PARTITION);
+ }
+
+ public abstract String getTable();
+ public abstract List<Map<String, String>> getPartitions ();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ if (getPartitions() == null)
+ throw new IllegalStateException("Partition-list unset.");
+ return super.checkValid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java
new file mode 100644
index 0000000..64a8cc5
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/DropTableMessage.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+public abstract class DropTableMessage extends EventMessage {
+
+ protected DropTableMessage() {
+ super(EventType.DROP_TABLE);
+ }
+
+ /**
+ * Getter for the name of the table being dropped.
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ return super.checkValid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
new file mode 100644
index 0000000..1ec0de0
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+/**
+ * Class representing messages emitted when Metastore operations are done.
+ * (E.g. Creation and deletion of databases, tables and partitions.)
+ */
+public abstract class EventMessage {
+
+ /**
+ * Enumeration of all supported types of Metastore operations.
+ */
+ public static enum EventType {
+
+ CREATE_DATABASE(MessageFactory.CREATE_DATABASE_EVENT),
+ DROP_DATABASE(MessageFactory.DROP_DATABASE_EVENT),
+ CREATE_TABLE(MessageFactory.CREATE_TABLE_EVENT),
+ DROP_TABLE(MessageFactory.DROP_TABLE_EVENT),
+ ADD_PARTITION(MessageFactory.ADD_PARTITION_EVENT),
+ DROP_PARTITION(MessageFactory.DROP_PARTITION_EVENT),
+ ALTER_TABLE(MessageFactory.ALTER_TABLE_EVENT),
+ ALTER_PARTITION(MessageFactory.ALTER_PARTITION_EVENT),
+ INSERT(MessageFactory.INSERT_EVENT),
+ CREATE_FUNCTION(MessageFactory.CREATE_FUNCTION_EVENT),
+ DROP_FUNCTION(MessageFactory.DROP_FUNCTION_EVENT),
+ CREATE_INDEX(MessageFactory.CREATE_INDEX_EVENT),
+ DROP_INDEX(MessageFactory.DROP_INDEX_EVENT),
+ ALTER_INDEX(MessageFactory.ALTER_INDEX_EVENT);
+
+ private String typeString;
+
+ EventType(String typeString) {
+ this.typeString = typeString;
+ }
+
+ @Override
+ public String toString() { return typeString; }
+ }
+
+ protected EventType eventType;
+
+ protected EventMessage(EventType eventType) {
+ this.eventType = eventType;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ /**
+ * Getter for HCatalog Server's URL.
+ * (This is where the event originates from.)
+ * @return HCatalog Server's URL (String).
+ */
+ public abstract String getServer();
+
+ /**
+ * Getter for the Kerberos principal of the HCatalog service.
+ * @return HCatalog Service Principal (String).
+ */
+ public abstract String getServicePrincipal();
+
+ /**
+ * Getter for the name of the Database on which the Metastore operation is done.
+ * @return Database-name (String).
+ */
+ public abstract String getDB();
+
+ /**
+ * Getter for the timestamp associated with the operation.
+ * @return Timestamp (Long - seconds since epoch).
+ */
+ public abstract Long getTimestamp();
+
+ /**
+ * Class invariant. Checked after construction or deserialization.
+ */
+ public EventMessage checkValid() {
+ if (getServer() == null || getServicePrincipal() == null)
+ throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null.");
+ if (getEventType() == null)
+ throw new IllegalStateException("Event-type unset.");
+ if (getDB() == null)
+ throw new IllegalArgumentException("DB-name unset.");
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
new file mode 100644
index 0000000..932af7e
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hive.metastore.messaging;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class EventUtils {
+
+ /**
+ * Utility function that constructs a notification filter to match a given db name and/or table name.
+ * If dbName == null, fetches all warehouse events.
+ * If dnName != null, but tableName == null, fetches all events for the db
+ * If dbName != null && tableName != null, fetches all events for the specified table
+ * @param dbName
+ * @param tableName
+ * @return
+ */
+ public static IMetaStoreClient.NotificationFilter getDbTblNotificationFilter(final String dbName, final String tableName){
+ return new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ if (event == null){
+ return false; // get rid of trivial case first, so that we can safely assume non-null
+ }
+ if (dbName == null){
+ return true; // if our dbName is null, we're interested in all wh events
+ }
+ if (dbName.equalsIgnoreCase(event.getDbName())){
+ if ( (tableName == null)
+ // if our dbName is equal, but tableName is blank, we're interested in this db-level event
+ || (tableName.equalsIgnoreCase(event.getTableName()))
+ // table level event that matches us
+ ){
+ return true;
+ }
+ }
+ return false;
+ }
+ };
+ }
+
+
+ public interface NotificationFetcher {
+ public int getBatchSize() throws IOException;
+ public long getCurrentNotificationEventId() throws IOException;
+ public List<NotificationEvent> getNextNotificationEvents(
+ long pos, IMetaStoreClient.NotificationFilter filter) throws IOException;
+ }
+
+ // MetaStoreClient-based impl of NotificationFetcher
+ public static class MSClientNotificationFetcher implements NotificationFetcher{
+
+ private IMetaStoreClient msc = null;
+ private Integer batchSize = null;
+
+ public MSClientNotificationFetcher(IMetaStoreClient msc){
+ this.msc = msc;
+ }
+
+ @Override
+ public int getBatchSize() throws IOException {
+ if (batchSize == null){
+ try {
+ batchSize = Integer.parseInt(
+ msc.getConfigValue(HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX.varname, "50"));
+ // TODO: we're asking the metastore what its configuration for this var is - we may
+ // want to revisit to pull from client side instead. The reason I have it this way
+ // is because the metastore is more likely to have a reasonable config for this than
+ // an arbitrary client.
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+ return batchSize;
+ }
+
+ @Override
+ public long getCurrentNotificationEventId() throws IOException {
+ try {
+ return msc.getCurrentNotificationEventId().getEventId();
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public List<NotificationEvent> getNextNotificationEvents(
+ long pos, IMetaStoreClient.NotificationFilter filter) throws IOException {
+ try {
+ return msc.getNextNotification(pos,getBatchSize(), filter).getEvents();
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ public static class NotificationEventIterator implements Iterator<NotificationEvent> {
+
+ private NotificationFetcher nfetcher;
+ private IMetaStoreClient.NotificationFilter filter;
+ private int maxEvents;
+
+ private Iterator<NotificationEvent> batchIter = null;
+ private List<NotificationEvent> batch = null;
+ private long pos;
+ private long maxPos;
+ private int eventCount;
+
+ public NotificationEventIterator(
+ NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+ String dbName, String tableName) throws IOException {
+ init(nfetcher, eventFrom, maxEvents, EventUtils.getDbTblNotificationFilter(dbName, tableName));
+ // using init(..) instead of this(..) because the EventUtils.getDbTblNotificationFilter
+ // is an operation that needs to run before delegating to the other ctor, and this messes up chaining
+ // ctors
+ }
+
+ public NotificationEventIterator(
+ NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+ IMetaStoreClient.NotificationFilter filter) throws IOException {
+ init(nfetcher,eventFrom,maxEvents,filter);
+ }
+
+ private void init(
+ NotificationFetcher nfetcher, long eventFrom, int maxEvents,
+ IMetaStoreClient.NotificationFilter filter) throws IOException {
+ this.nfetcher = nfetcher;
+ this.filter = filter;
+ this.pos = eventFrom;
+ if (maxEvents < 1){
+ // 0 or -1 implies fetch everything
+ this.maxEvents = Integer.MAX_VALUE;
+ } else {
+ this.maxEvents = maxEvents;
+ }
+
+ this.eventCount = 0;
+ this.maxPos = nfetcher.getCurrentNotificationEventId();
+ }
+
+ private void fetchNextBatch() throws IOException {
+ batch = nfetcher.getNextNotificationEvents(pos, filter);
+ int batchSize = nfetcher.getBatchSize();
+ while ( ((batch == null) || (batch.isEmpty())) && (pos < maxPos) ){
+ // no valid events this batch, but we're still not done processing events
+ pos += batchSize;
+ batch = nfetcher.getNextNotificationEvents(pos,filter);
+ }
+
+ if (batch == null){
+ batch = new ArrayList<NotificationEvent>();
+ // instantiate empty list so that we don't error out on iterator fetching.
+ // If we're here, then the next check of pos will show our caller that
+ // that we've exhausted our event supply
+ }
+ batchIter = batch.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (eventCount >= maxEvents){
+ // If we've already satisfied the number of events we were supposed to deliver, we end it.
+ return false;
+ }
+ if ((batchIter != null) && (batchIter.hasNext())){
+ // If we have a valid batchIter and it has more elements, return them.
+ return true;
+ }
+ // If we're here, we want more events, and either batchIter is null, or batchIter
+ // has reached the end of the current batch. Let's fetch the next batch.
+ try {
+ fetchNextBatch();
+ } catch (IOException e) {
+ // Regrettable that we have to wrap the IOException into a RuntimeException,
+ // but throwing the exception is the appropriate result here, and hasNext()
+ // signature will only allow RuntimeExceptions. Iterator.hasNext() really
+ // should have allowed IOExceptions
+ throw new RuntimeException(e);
+ }
+ // New batch has been fetched. If it's not empty, we have more elements to process.
+ return !batch.isEmpty();
+ }
+
+ @Override
+ public NotificationEvent next() {
+ eventCount++;
+ NotificationEvent ev = batchIter.next();
+ pos = ev.getEventId();
+ return ev;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove() not supported on NotificationEventIterator");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
new file mode 100644
index 0000000..fe747df
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HCat message sent when an insert is done to a table or partition.
+ */
+public abstract class InsertMessage extends EventMessage {
+
+ protected InsertMessage() {
+ super(EventType.INSERT);
+ }
+
+ /**
+ * Getter for the name of the table being insert into.
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ /**
+ * Get the map of partition keyvalues. Will be null if this insert is to a table and not a
+ * partition.
+ * @return Map of partition keyvalues, or null.
+ */
+ public abstract Map<String,String> getPartitionKeyValues();
+
+ /**
+ * Get the list of files created as a result of this DML operation. May be null.
+ * @return List of new files, or null.
+ */
+ public abstract List<String> getFiles();
+
+ @Override
+ public EventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ return super.checkValid();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/3dd28fbb/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
new file mode 100644
index 0000000..515c455
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+/**
+ * Interface for converting HCat events from String-form back to EventMessage instances.
+ */
+public abstract class MessageDeserializer {
+
+ /**
+ * Method to construct EventMessage from string.
+ */
+ public EventMessage getEventMessage(String eventTypeString, String messageBody) {
+
+ switch (EventMessage.EventType.valueOf(eventTypeString)) {
+ case CREATE_DATABASE:
+ return getCreateDatabaseMessage(messageBody);
+ case DROP_DATABASE:
+ return getDropDatabaseMessage(messageBody);
+ case CREATE_TABLE:
+ return getCreateTableMessage(messageBody);
+ case ALTER_TABLE:
+ return getAlterTableMessage(messageBody);
+ case DROP_TABLE:
+ return getDropTableMessage(messageBody);
+ case ADD_PARTITION:
+ return getAddPartitionMessage(messageBody);
+ case ALTER_PARTITION:
+ return getAlterPartitionMessage(messageBody);
+ case DROP_PARTITION:
+ return getDropPartitionMessage(messageBody);
+ case CREATE_FUNCTION:
+ return getCreateFunctionMessage(messageBody);
+ case DROP_FUNCTION:
+ return getDropFunctionMessage(messageBody);
+ case CREATE_INDEX:
+ return getCreateIndexMessage(messageBody);
+ case DROP_INDEX:
+ return getDropIndexMessage(messageBody);
+ case ALTER_INDEX:
+ return getAlterIndexMessage(messageBody);
+ case INSERT:
+ return getInsertMessage(messageBody);
+
+ default:
+ throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
+ }
+ }
+
+ /**
+ * Method to de-serialize CreateDatabaseMessage instance.
+ */
+ public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropDatabaseMessage instance.
+ */
+ public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody);
+
+ /**
+ * Method to de-serialize CreateTableMessage instance.
+ */
+ public abstract CreateTableMessage getCreateTableMessage(String messageBody);
+
+ /**
+ * Method to de-serialize AlterTableMessge
+ * @param messageBody string message
+ * @return object message
+ */
+ public abstract AlterTableMessage getAlterTableMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropTableMessage instance.
+ */
+ public abstract DropTableMessage getDropTableMessage(String messageBody);
+
+ /**
+ * Method to de-serialize AddPartitionMessage instance.
+ */
+ public abstract AddPartitionMessage getAddPartitionMessage(String messageBody);
+
+ /**
+ * Method to deserialize AlterPartitionMessage
+ * @param messageBody the message in serialized form
+ * @return message in object form
+ */
+ public abstract AlterPartitionMessage getAlterPartitionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropPartitionMessage instance.
+ */
+ public abstract DropPartitionMessage getDropPartitionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize CreateFunctionMessage instance.
+ */
+ public abstract CreateFunctionMessage getCreateFunctionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropFunctionMessage instance.
+ */
+ public abstract DropFunctionMessage getDropFunctionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize CreateIndexMessage instance.
+ */
+ public abstract CreateIndexMessage getCreateIndexMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropIndexMessage instance.
+ */
+ public abstract DropIndexMessage getDropIndexMessage(String messageBody);
+
+ /**
+ * Method to de-serialize AlterIndexMessage instance.
+ */
+ public abstract AlterIndexMessage getAlterIndexMessage(String messageBody);
+
+ /**
+ * Method to deserialize InsertMessage
+ * @param messageBody the message in serialized form
+ * @return message in object form
+ */
+ public abstract InsertMessage getInsertMessage(String messageBody);
+
+ // Protection against construction.
+ protected MessageDeserializer() {}
+}