You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC

svn commit: r1520466 [16/18] - in /hive/trunk/hcatalog: core/src/main/java/org/apache/hcatalog/cli/ core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/ core/src/main/java/org/apache/hcatalog/common/ core/src/main/java/org/apache/hcatalog/data/...

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of
+ * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
+ * message on two type of topics. One has name of form dbName.tblName On this
+ * topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message. Second topic has name "HCAT" and messages sent on
+ * it are: add/drop database and add/drop table. All messages also has a
+ * property named "HCAT_EVENT" set on them whose value can be used to configure
+ * message selector on subscriber side.
+ */
+public class NotificationListener extends MetaStoreEventListener {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
+    protected Session session;
+    protected Connection conn;
+    private static MessageFactory messageFactory = MessageFactory.getInstance();
+
+    /**
+     * Create message bus connection and session in constructor.
+     */
+    public NotificationListener(final Configuration conf) {
+
+        super(conf);
+        createConnection();
+    }
+
+    private static String getTopicName(Partition partition,
+                                       ListenerEvent partitionEvent) throws MetaException {
+        try {
+            return partitionEvent.getHandler()
+                .get_table(partition.getDbName(), partition.getTableName())
+                .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+        } catch (NoSuchObjectException e) {
+            throw new MetaException(e.toString());
+        }
+    }
+
+    @Override
+    public void onAddPartition(AddPartitionEvent partitionEvent)
+        throws MetaException {
+        // Subscriber can get notification of newly add partition in a
+        // particular table by listening on a topic named "dbName.tableName"
+        // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+        if (partitionEvent.getStatus()) {
+
+            Partition partition = partitionEvent.getPartition();
+            String topicName = getTopicName(partition, partitionEvent);
+            if (topicName != null && !topicName.equals("")) {
+                send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), topicName);
+            } else {
+                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+                    + partition.getDbName()
+                    + "."
+                    + partition.getTableName()
+                    + " To enable notifications for this table, please do alter table set properties ("
+                    + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+                    + "=<dbname>.<tablename>) or whatever you want topic name to be.");
+            }
+        }
+
+    }
+
+    /**
+     * Send dropped partition notifications. Subscribers can receive these notifications for a
+     * particular table by listening on a topic named "dbName.tableName" with message selector
+     * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+     * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}.
+     * </br>
+     * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+     * found to throw NPE when serializing objects that contain null. For this reason we override
+     * some fields in the StorageDescriptor of this notification. This should be fixed after
+     * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+     */
+    @Override
+    public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+        if (partitionEvent.getStatus()) {
+            Partition partition = partitionEvent.getPartition();
+            StorageDescriptor sd = partition.getSd();
+            sd.setBucketCols(new ArrayList<String>());
+            sd.setSortCols(new ArrayList<Order>());
+            sd.setParameters(new HashMap<String, String>());
+            sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+            sd.getSkewedInfo().setSkewedColNames(new ArrayList<String>());
+            String topicName = getTopicName(partition, partitionEvent);
+            if (topicName != null && !topicName.equals("")) {
+                send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName);
+            } else {
+                LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+                    + partition.getDbName()
+                    + "."
+                    + partition.getTableName()
+                    + " To enable notifications for this table, please do alter table set properties ("
+                    + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+                    + "=<dbname>.<tablename>) or whatever you want topic name to be.");
+            }
+        }
+    }
+
+    @Override
+    public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+        throws MetaException {
+        // Subscriber can get notification about addition of a database in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+        if (dbEvent.getStatus()) {
+            String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+            send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName);
+        }
+    }
+
+    @Override
+    public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+        // Subscriber can get notification about drop of a database in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+        if (dbEvent.getStatus())  {
+            String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+            send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
+        }
+    }
+
+    @Override
+    public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+        // Subscriber can get notification about addition of a table in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_ADD_TABLE"
+        if (tableEvent.getStatus()) {
+            Table tbl = tableEvent.getTable();
+            HMSHandler handler = tableEvent.getHandler();
+            HiveConf conf = handler.getHiveConf();
+            Table newTbl;
+            try {
+                newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+                    .deepCopy();
+                newTbl.getParameters().put(
+                    HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+                    getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+                        + newTbl.getTableName().toLowerCase());
+                handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+            } catch (InvalidOperationException e) {
+                MetaException me = new MetaException(e.toString());
+                me.initCause(e);
+                throw me;
+            } catch (NoSuchObjectException e) {
+                MetaException me = new MetaException(e.toString());
+                me.initCause(e);
+                throw me;
+            }
+            String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase();
+            send(messageFactory.buildCreateTableMessage(newTbl), topicName);
+        }
+    }
+
+    private String getTopicPrefix(HiveConf conf) {
+        return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
+            HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+    }
+
+    /**
+     * Send dropped table notifications. Subscribers can receive these notifications for
+     * dropped tables by listening on topic "HCAT" with message selector string
+     * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+     * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT}
+     * </br>
+     * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+     * found to throw NPE when serializing objects that contain null. For this reason we override
+     * some fields in the StorageDescriptor of this notification. This should be fixed after
+     * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+     */
+    @Override
+    public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+        // Subscriber can get notification about drop of a table in HCAT
+        // by listening on a topic named "HCAT" and message selector string
+        // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+        // Datanucleus throws NPE when we try to serialize a table object
+        // retrieved from metastore. To workaround that we reset following objects
+
+        if (tableEvent.getStatus()) {
+            Table table = tableEvent.getTable();
+            String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase();
+            send(messageFactory.buildDropTableMessage(table), topicName);
+        }
+    }
+
+    /**
+     * @param hCatEventMessage The HCatEventMessage being sent over JMS.
+     * @param topicName is the name on message broker on which message is sent.
+     */
+    protected void send(HCatEventMessage hCatEventMessage, String topicName) {
+        try {
+            if(null == session){
+                // this will happen, if we never able to establish a connection.
+                createConnection();
+                if (null == session){
+                    // Still not successful, return from here.
+                    LOG.error("Invalid session. Failed to send message on topic: " +
+                            topicName + " event: " + hCatEventMessage.getEventType());
+                    return;
+                }
+            }
+
+            Destination topic = getTopic(topicName);
+
+            if (null == topic){
+                // Still not successful, return from here.
+                LOG.error("Invalid session. Failed to send message on topic: " +
+                        topicName + " event: " + hCatEventMessage.getEventType());
+                return;
+            }
+
+            MessageProducer producer = session.createProducer(topic);
+            Message msg = session.createTextMessage(hCatEventMessage.toString());
+
+            msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
+            msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
+            msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+            producer.send(msg);
+            // Message must be transacted before we return.
+            session.commit();
+        }
+        catch(Exception e){
+            // Gobble up the exception. Message delivery is best effort.
+            LOG.error("Failed to send message on topic: " + topicName +
+                    " event: " + hCatEventMessage.getEventType(), e);
+        }
+    }
+
+    /**
+     * Get the topic object for the topicName, it also tries to reconnect
+     * if the connection appears to be broken.
+     *
+     * @param topicName The String identifying the message-topic.
+     * @return A {@link Topic} object corresponding to the specified topicName.
+     * @throws JMSException
+     */
+    protected Topic getTopic(final String topicName) throws JMSException {
+        Topic topic;
+        try {
+            // Topics are created on demand. If it doesn't exist on broker it will
+            // be created when broker receives this message.
+            topic = session.createTopic(topicName);
+        } catch (IllegalStateException ise) {
+            // this will happen if we were able to establish connection once, but its no longer valid,
+            // ise is thrown, catch it and retry.
+            LOG.error("Seems like connection is lost. Retrying", ise);
+            createConnection();
+            topic = session.createTopic(topicName);
+        }
+        return topic;
+    }
+
+    protected void createConnection() {
+
+        Context jndiCntxt;
+        try {
+            jndiCntxt = new InitialContext();
+            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
+                .lookup("ConnectionFactory");
+            Connection conn = connFac.createConnection();
+            conn.start();
+            conn.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException jmse) {
+                    LOG.error(jmse.toString());
+                }
+            });
+            // We want message to be sent when session commits, thus we run in
+            // transacted mode.
+            session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        } catch (NamingException e) {
+            LOG.error("JNDI error while setting up Message Bus connection. "
+                + "Please make sure file named 'jndi.properties' is in "
+                + "classpath and contains appropriate key-value pairs.", e);
+        } catch (JMSException e) {
+            LOG.error("Failed to initialize connection to message bus", e);
+        } catch (Throwable t) {
+            LOG.error("Unable to connect to JMS provider", t);
+        }
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        // Close the connection before dying.
+        try {
+            if (null != session)
+                session.close();
+            if (conn != null) {
+                conn.close();
+            }
+
+        } catch (Exception ignore) {
+            LOG.info("Failed to close message bus connection.", ignore);
+        }
+    }
+
+    @Override
+    public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+        throws MetaException {
+//  TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose.
+//		if(lpde.getStatus())
+//			send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
+    }
+
+    @Override
+    public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+        // no-op
+    }
+
+    @Override
+    public void onAlterTable(AlterTableEvent ate) throws MetaException {
+        // no-op
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The HCat message sent when partition(s) are added to a table.
+ */
+public abstract class AddPartitionMessage extends HCatEventMessage {
+
+    protected AddPartitionMessage() {
+        super(EventType.ADD_PARTITION);
+    }
+
+    /**
+     * Getter for name of table (where partitions are added).
+     * @return Table-name (String).
+     */
+    public abstract String getTable();
+
+    /**
+     * Getter for list of partitions added.
+     * @return List of maps, where each map identifies values for each partition-key, for every added partition.
+     */
+    public abstract List<Map<String, String>> getPartitions ();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        if (getPartitions() == null)
+            throw new IllegalStateException("Partition-list unset.");
+        return super.checkValid();
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Database is created in HCatalog.
+ */
+public abstract class CreateDatabaseMessage extends HCatEventMessage {
+
+    protected CreateDatabaseMessage() {
+        super(EventType.CREATE_DATABASE);
+    }
+
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a table is created in HCatalog.
+ */
+public abstract class CreateTableMessage extends HCatEventMessage {
+
+    protected CreateTableMessage() {
+        super(EventType.CREATE_TABLE);
+    }
+
+    /**
+     * Getter for the name of table created in HCatalog.
+     * @return Table-name (String).
+     */
+    public abstract String getTable();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        return super.checkValid();
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Database is dropped from HCatalog.
+ */
+public abstract class DropDatabaseMessage extends HCatEventMessage {
+
+    protected DropDatabaseMessage() {
+        super(EventType.DROP_DATABASE);
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HCat message sent when a partition is dropped in HCatalog.
+ */
+public abstract class DropPartitionMessage extends HCatEventMessage {
+
+    protected DropPartitionMessage() {
+        super(EventType.DROP_PARTITION);
+    }
+
+    public abstract String getTable();
+    public abstract List<Map<String, String>> getPartitions ();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        if (getPartitions() == null)
+            throw new IllegalStateException("Partition-list unset.");
+        return super.checkValid();
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Table is dropped in HCatalog.
+ */
+public abstract class DropTableMessage extends HCatEventMessage {
+
+    protected DropTableMessage() {
+        super(EventType.DROP_TABLE);
+    }
+
+    /**
+     * Getter for the name of the table being dropped.
+     * @return Table-name (String).
+     */
+    public abstract String getTable();
+
+    @Override
+    public HCatEventMessage checkValid() {
+        if (getTable() == null)
+            throw new IllegalStateException("Table name unset.");
+        return super.checkValid();
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Class representing messages emitted when Metastore operations are done.
+ * (E.g. Creation and deletion of databases, tables and partitions.)
+ */
+public abstract class HCatEventMessage {
+
+    /**
+     * Enumeration of all supported types of Metastore operations.
+     */
+    public static enum EventType {
+
+        CREATE_DATABASE(HCatConstants.HCAT_CREATE_DATABASE_EVENT),
+        DROP_DATABASE(HCatConstants.HCAT_DROP_DATABASE_EVENT),
+        CREATE_TABLE(HCatConstants.HCAT_CREATE_TABLE_EVENT),
+        DROP_TABLE(HCatConstants.HCAT_DROP_TABLE_EVENT),
+        ADD_PARTITION(HCatConstants.HCAT_ADD_PARTITION_EVENT),
+        DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT);
+
+        private String typeString;
+
+        EventType(String typeString) {
+            this.typeString = typeString;
+        }
+
+        @Override
+        public String toString() { return typeString; }
+    }
+
+    protected EventType eventType;
+
+    protected HCatEventMessage(EventType eventType) {
+        this.eventType = eventType;
+    }
+
+    public EventType getEventType() {
+        return eventType;
+    }
+
+    /**
+     * Getter for HCatalog Server's URL.
+     * (This is where the event originates from.)
+     * @return HCatalog Server's URL (String).
+     */
+    public abstract String getServer();
+
+    /**
+     * Getter for the Kerberos principal of the HCatalog service.
+     * @return HCatalog Service Principal (String).
+     */
+    public abstract String getServicePrincipal();
+
+    /**
+     * Getter for the name of the Database on which the Metastore operation is done.
+     * @return Database-name (String).
+     */
+    public abstract String getDB();
+
+    /**
+     * Getter for the timestamp associated with the operation.
+     * @return Timestamp (Long - seconds since epoch).
+     */
+    public abstract Long   getTimestamp();
+
+    /**
+     * Class invariant. Checked after construction or deserialization.
+     */
+    public HCatEventMessage checkValid() {
+        if (getServer() == null || getServicePrincipal() == null)
+            throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null.");
+        if (getEventType() == null)
+            throw new IllegalStateException("Event-type unset.");
+        if (getDB() == null)
+            throw new IllegalArgumentException("DB-name unset.");
+
+        return this;
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * Interface for converting HCat events from String-form back to HCatEventMessage instances.
+ */
+public abstract class MessageDeserializer {
+
+    /**
+     * Method to construct HCatEventMessage from string.
+     */
+    public HCatEventMessage getHCatEventMessage(String eventTypeString, String messageBody) {
+
+        switch (HCatEventMessage.EventType.valueOf(eventTypeString)) {
+        case CREATE_DATABASE:
+            return getCreateDatabaseMessage(messageBody);
+        case DROP_DATABASE:
+            return getDropDatabaseMessage(messageBody);
+        case CREATE_TABLE:
+            return getCreateTableMessage(messageBody);
+        case DROP_TABLE:
+            return getDropTableMessage(messageBody);
+        case ADD_PARTITION:
+            return getAddPartitionMessage(messageBody);
+        case DROP_PARTITION:
+            return getDropPartitionMessage(messageBody);
+
+        default:
+            throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
+        }
+    }
+
+    /**
+     * Method to de-serialize CreateDatabaseMessage instance.
+     */
+    public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody);
+
+    /**
+     * Method to de-serialize DropDatabaseMessage instance.
+     */
+    public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody);
+
+    /**
+     * Method to de-serialize CreateTableMessage instance.
+     */
+    public abstract CreateTableMessage getCreateTableMessage(String messageBody);
+
+    /**
+     * Method to de-serialize DropTableMessage instance.
+     */
+    public abstract DropTableMessage getDropTableMessage(String messageBody);
+
+    /**
+     * Method to de-serialize AddPartitionMessage instance.
+     */
+    public abstract AddPartitionMessage getAddPartitionMessage(String messageBody);
+
+    /**
+     * Method to de-serialize DropPartitionMessage instance.
+     */
+    public abstract DropPartitionMessage getDropPartitionMessage(String messageBody);
+
+    // Protection against construction.
+    protected MessageDeserializer() {}
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.messaging.json.JSONMessageFactory;
+
+/**
+ * Abstract Factory for the construction of HCatalog message instances.
+ */
+public abstract class MessageFactory {
+
+    private static MessageFactory instance = new JSONMessageFactory();
+
+    protected static final HiveConf hiveConf = new HiveConf();
+    static {
+        hiveConf.addResource("hive-site.xml");
+    }
+
+    private static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl.";
+    private static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format";
+    private static final String HCAT_MESSAGE_FORMAT = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FORMAT, "json");
+    private static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory";
+    private static final String HCAT_MESSAGE_FACTORY_IMPL = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX
+                                                                         + HCAT_MESSAGE_FORMAT,
+                                                                         DEFAULT_MESSAGE_FACTORY_IMPL);
+
+    protected static final String HCAT_SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), "");
+    protected static final String HCAT_SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), "");
+
+    /**
+     * Getter for MessageFactory instance.
+     */
+    public static MessageFactory getInstance() {
+        if (instance == null) {
+            instance = getInstance(HCAT_MESSAGE_FACTORY_IMPL);
+        }
+        return instance;
+    }
+
+    private static MessageFactory getInstance(String className) {
+        try {
+            return (MessageFactory)ReflectionUtils.newInstance(Class.forName(className), hiveConf);
+        }
+        catch (ClassNotFoundException classNotFound) {
+            throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound);
+        }
+    }
+
+    /**
+     * Getter for MessageDeserializer, corresponding to the specified format and version.
+     * @param format Serialization format for notifications.
+     * @param version Version of serialization format (currently ignored.)
+     * @return MessageDeserializer.
+     */
+    public static MessageDeserializer getDeserializer(String format,
+                                                      String version) {
+        return getInstance(hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format,
+                                        DEFAULT_MESSAGE_FACTORY_IMPL)).getDeserializer();
+    }
+
+    public abstract MessageDeserializer getDeserializer();
+
+    /**
+     * Getter for version-string, corresponding to all constructed messages.
+     */
+    public abstract String getVersion();
+
+    /**
+     * Getter for message-format.
+     */
+    public abstract String getMessageFormat();
+
+    /**
+     * Factory method for CreateDatabaseMessage.
+     * @param db The Database being added.
+     * @return CreateDatabaseMessage instance.
+     */
+    public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db);
+
+    /**
+     * Factory method for DropDatabaseMessage.
+     * @param db The Database being dropped.
+     * @return DropDatabaseMessage instance.
+     */
+    public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db);
+
+    /**
+     * Factory method for CreateTableMessage.
+     * @param table The Table being created.
+     * @return CreateTableMessage instance.
+     */
+    public abstract CreateTableMessage buildCreateTableMessage(Table table);
+
+    /**
+     * Factory method for DropTableMessage.
+     * @param table The Table being dropped.
+     * @return DropTableMessage instance.
+     */
+    public abstract DropTableMessage buildDropTableMessage(Table table);
+
+    /**
+     * Factory method for AddPartitionMessage.
+     * @param table The Table to which the partition is added.
+     * @param partition The Partition being added.
+     * @return AddPartitionMessage instance.
+     */
+    public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition);
+
+    /**
+     * Factory method for DropPartitionMessage.
+     * @param table The Table from which the partition is dropped.
+     * @param partition The Partition being dropped.
+     * @return DropPartitionMessage instance.
+     */
+    public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition);
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.jms;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+/**
+ * Helper Utility to assist consumers of HCat Messages in extracting
+ * message-content from JMS messages.
+ */
+public class MessagingUtils {
+
+    /**
+     * Method to return HCatEventMessage contained in the JMS message.
+     * @param message The JMS Message instance
+     * @return The contained HCatEventMessage
+     */
+    public static HCatEventMessage getMessage(Message message) {
+        try {
+            String messageBody = ((TextMessage)message).getText();
+            String eventType   = message.getStringProperty(HCatConstants.HCAT_EVENT);
+            String messageVersion = message.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION);
+            String messageFormat = message.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT);
+
+            if (StringUtils.isEmpty(messageBody) || StringUtils.isEmpty(eventType))
+                throw new IllegalArgumentException("Could not extract HCatEventMessage. " +
+                                                   "EventType and/or MessageBody is null/empty.");
+
+            return MessageFactory.getDeserializer(messageFormat, messageVersion).getHCatEventMessage(eventType, messageBody);
+        }
+        catch (JMSException exception) {
+            throw new IllegalArgumentException("Could not extract HCatEventMessage. ", exception);
+        }
+    }
+
+    // Prevent construction.
+    private MessagingUtils() {}
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON implementation of AddPartitionMessage.
+ */
+public class JSONAddPartitionMessage extends AddPartitionMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    @JsonProperty
+    List<Map<String, String>> partitions;
+
+    /**
+     * Default Constructor. Required for Jackson.
+     */
+    public JSONAddPartitionMessage() {}
+
+    public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table,
+                                   List<Map<String,String>> partitions, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.partitions = partitions;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public Long   getTimestamp() { return timestamp; }
+
+    @Override
+    public List<Map<String, String>> getPartitions () { return partitions; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of CreateDatabaseMessage.
+ */
+public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, required for Jackson.
+     */
+    public JSONCreateDatabaseMessage() {}
+
+    public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of CreateTableMessage.
+ */
+public class JSONCreateTableMessage extends CreateTableMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, needed for Jackson.
+     */
+    public JSONCreateTableMessage() {}
+
+    public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of DropDatabaseMessage.
+ */
+public class JSONDropDatabaseMessage extends DropDatabaseMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, required for Jackson.
+     */
+    public JSONDropDatabaseMessage() {}
+
+    public JSONDropDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON implementation of DropPartitionMessage.
+ */
+public class JSONDropPartitionMessage extends DropPartitionMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    @JsonProperty
+    List<Map<String, String>> partitions;
+
+    /**
+     * Default Constructor. Required for Jackson.
+     */
+    public JSONDropPartitionMessage() {}
+
+    public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table,
+                                   List<Map<String,String>> partitions, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.partitions = partitions;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public Long   getTimestamp() { return timestamp; }
+
+    @Override
+    public List<Map<String, String>> getPartitions () { return partitions; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of DropTableMessage.
+ */
+public class JSONDropTableMessage extends DropTableMessage {
+
+    @JsonProperty
+    String server, servicePrincipal, db, table;
+
+    @JsonProperty
+    Long timestamp;
+
+    /**
+     * Default constructor, needed for Jackson.
+     */
+    public JSONDropTableMessage() {}
+
+    public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) {
+        this.server = server;
+        this.servicePrincipal = servicePrincipal;
+        this.db = db;
+        this.table = table;
+        this.timestamp = timestamp;
+        checkValid();
+    }
+
+
+    @Override
+    public String getTable() { return table; }
+
+    @Override
+    public String getServer() { return server; }
+
+    @Override
+    public String getServicePrincipal() { return servicePrincipal; }
+
+    @Override
+    public String getDB() { return db; }
+
+    @Override
+    public Long getTimestamp() { return timestamp; }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONMessageDeserializer.mapper.writeValueAsString(this);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not serialize: ", exception);
+        }
+    }
+
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * MessageDeserializer implementation, for deserializing from JSON strings.
+ */
+public class JSONMessageDeserializer extends MessageDeserializer {
+
+    static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+    static {
+        mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    }
+
+    @Override
+    public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception);
+        }
+    }
+
+    @Override
+    public DropDatabaseMessage getDropDatabaseMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONDropDatabaseMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception);
+        }
+    }
+
+    @Override
+    public CreateTableMessage getCreateTableMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONCreateTableMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception);
+        }
+    }
+
+    @Override
+    public DropTableMessage getDropTableMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONDropTableMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception);
+        }
+    }
+
+    @Override
+    public AddPartitionMessage getAddPartitionMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONAddPartitionMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception);
+        }
+    }
+
+    @Override
+    public DropPartitionMessage getDropPartitionMessage(String messageBody) {
+        try {
+            return mapper.readValue(messageBody, JSONDropPartitionMessage.class);
+        }
+        catch (Exception exception) {
+            throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception);
+        }
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.apache.hcatalog.messaging.MessageFactory;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * The JSON implementation of the MessageFactory. Constructs JSON implementations of
+ * each message-type.
+ */
+public class JSONMessageFactory extends MessageFactory {
+
+    private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
+
+    @Override
+    public MessageDeserializer getDeserializer() {
+        return deserializer;
+    }
+
+    @Override
+    public String getVersion() {
+        return "0.1";
+    }
+
+    @Override
+    public String getMessageFormat() {
+        return "json";
+    }
+
+    @Override
+    public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
+        return new JSONCreateDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(),
+                System.currentTimeMillis() / 1000);
+    }
+
+    @Override
+    public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
+        return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(),
+                System.currentTimeMillis() / 1000);
+    }
+
+    @Override
+    public CreateTableMessage buildCreateTableMessage(Table table) {
+        return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
+                table.getTableName(), System.currentTimeMillis()/1000);
+    }
+
+    @Override
+    public DropTableMessage buildDropTableMessage(Table table) {
+        return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(),
+                System.currentTimeMillis()/1000);
+    }
+
+    @Override
+    public AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition) {
+        return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
+                partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
+                System.currentTimeMillis()/1000);
+    }
+
+    @Override
+    public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) {
+        return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
+                partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
+                System.currentTimeMillis()/1000);
+    }
+
+    private static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
+        Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
+        for (int i=0; i<table.getPartitionKeysSize(); ++i)
+            partitionKeys.put(table.getPartitionKeys().get(i).getName(),
+                    partition.getValues().get(i));
+        return partitionKeys;
+    }
+}

Added: hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java Fri Sep  6 00:49:14 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.jms.MessagingUtils;
+
+public class TestMsgBusConnection extends TestCase {
+
+    private Driver driver;
+    private BrokerService broker;
+    private MessageConsumer consumer;
+
+    @Override
+    protected void setUp() throws Exception {
+
+        super.setUp();
+        broker = new BrokerService();
+        // configure the broker
+        broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+        broker.start();
+
+        System.setProperty("java.naming.factory.initial",
+                "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+        System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+        connectClient();
+        HiveConf hiveConf = new HiveConf(this.getClass());
+        hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+                NotificationListener.class.getName());
+        hiveConf.set("hive.metastore.local", "true");
+        hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+        hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+        hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
+        SessionState.start(new CliSessionState(hiveConf));
+        driver = new Driver(hiveConf);
+    }
+
+    private void connectClient() throws JMSException {
+        ConnectionFactory connFac = new ActiveMQConnectionFactory(
+                "tcp://localhost:61616");
+        Connection conn = connFac.createConnection();
+        conn.start();
+        Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+        Destination hcatTopic = session.createTopic("planetlab.hcat");
+        consumer = session.createConsumer(hcatTopic);
+    }
+
+    public void testConnection() throws Exception {
+
+        try {
+            driver.run("create database testconndb");
+            Message msg = consumer.receive();
+            assertTrue("Expected TextMessage", msg instanceof TextMessage);
+            assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+                    msg.getStringProperty(HCatConstants.HCAT_EVENT));
+            assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+            HCatEventMessage messageObject = MessagingUtils.getMessage(msg);
+            assertEquals("testconndb", messageObject.getDB());
+            broker.stop();
+            driver.run("drop database testconndb cascade");
+            broker.start(true);
+            connectClient();
+            driver.run("create database testconndb");
+            msg = consumer.receive();
+            assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+                    msg.getStringProperty(HCatConstants.HCAT_EVENT));
+            assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+            assertEquals("testconndb", messageObject.getDB());
+            driver.run("drop database testconndb cascade");
+            msg = consumer.receive();
+            assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
+                    msg.getStringProperty(HCatConstants.HCAT_EVENT));
+            assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+            assertEquals("testconndb", messageObject.getDB());
+        } catch (NoSuchObjectException nsoe) {
+            nsoe.printStackTrace(System.err);
+            assert false;
+        } catch (AlreadyExistsException aee) {
+            aee.printStackTrace(System.err);
+            assert false;
+        }
+    }
+}