You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kh...@apache.org on 2013/09/06 02:49:17 UTC
svn commit: r1520466 [16/18] - in /hive/trunk/hcatalog:
core/src/main/java/org/apache/hcatalog/cli/
core/src/main/java/org/apache/hcatalog/cli/SemanticAnalysis/
core/src/main/java/org/apache/hcatalog/common/
core/src/main/java/org/apache/hcatalog/data/...
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.AlterTableEvent;
+import org.apache.hadoop.hive.metastore.events.CreateDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.CreateTableEvent;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.DropPartitionEvent;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of
+ * {@link org.apache.hadoop.hive.metastore.MetaStoreEventListener} It sends
+ * message on two type of topics. One has name of form dbName.tblName On this
+ * topic, two kind of messages are sent: add/drop partition and
+ * finalize_partition message. Second topic has name "HCAT" and messages sent on
+ * it are: add/drop database and add/drop table. All messages also has a
+ * property named "HCAT_EVENT" set on them whose value can be used to configure
+ * message selector on subscriber side.
+ */
+public class NotificationListener extends MetaStoreEventListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
+ protected Session session;
+ protected Connection conn;
+ private static MessageFactory messageFactory = MessageFactory.getInstance();
+
+ /**
+ * Create message bus connection and session in constructor.
+ */
+ public NotificationListener(final Configuration conf) {
+
+ super(conf);
+ createConnection();
+ }
+
+ private static String getTopicName(Partition partition,
+ ListenerEvent partitionEvent) throws MetaException {
+ try {
+ return partitionEvent.getHandler()
+ .get_table(partition.getDbName(), partition.getTableName())
+ .getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME);
+ } catch (NoSuchObjectException e) {
+ throw new MetaException(e.toString());
+ }
+ }
+
+ @Override
+ public void onAddPartition(AddPartitionEvent partitionEvent)
+ throws MetaException {
+ // Subscriber can get notification of newly add partition in a
+ // particular table by listening on a topic named "dbName.tableName"
+ // and message selector string as "HCAT_EVENT = HCAT_ADD_PARTITION"
+ if (partitionEvent.getStatus()) {
+
+ Partition partition = partitionEvent.getPartition();
+ String topicName = getTopicName(partition, partitionEvent);
+ if (topicName != null && !topicName.equals("")) {
+ send(messageFactory.buildAddPartitionMessage(partitionEvent.getTable(), partition), topicName);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ + " To enable notifications for this table, please do alter table set properties ("
+ + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ + "=<dbname>.<tablename>) or whatever you want topic name to be.");
+ }
+ }
+
+ }
+
+ /**
+ * Send dropped partition notifications. Subscribers can receive these notifications for a
+ * particular table by listening on a topic named "dbName.tableName" with message selector
+ * string {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_PARTITION_EVENT}.
+ * </br>
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this reason we override
+ * some fields in the StorageDescriptor of this notification. This should be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
+ @Override
+ public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
+ if (partitionEvent.getStatus()) {
+ Partition partition = partitionEvent.getPartition();
+ StorageDescriptor sd = partition.getSd();
+ sd.setBucketCols(new ArrayList<String>());
+ sd.setSortCols(new ArrayList<Order>());
+ sd.setParameters(new HashMap<String, String>());
+ sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+ sd.getSkewedInfo().setSkewedColNames(new ArrayList<String>());
+ String topicName = getTopicName(partition, partitionEvent);
+ if (topicName != null && !topicName.equals("")) {
+ send(messageFactory.buildDropPartitionMessage(partitionEvent.getTable(), partition), topicName);
+ } else {
+ LOG.info("Topic name not found in metastore. Suppressing HCatalog notification for "
+ + partition.getDbName()
+ + "."
+ + partition.getTableName()
+ + " To enable notifications for this table, please do alter table set properties ("
+ + HCatConstants.HCAT_MSGBUS_TOPIC_NAME
+ + "=<dbname>.<tablename>) or whatever you want topic name to be.");
+ }
+ }
+ }
+
+ @Override
+ public void onCreateDatabase(CreateDatabaseEvent dbEvent)
+ throws MetaException {
+ // Subscriber can get notification about addition of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_DATABASE"
+ if (dbEvent.getStatus()) {
+ String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+ send(messageFactory.buildCreateDatabaseMessage(dbEvent.getDatabase()), topicName);
+ }
+ }
+
+ @Override
+ public void onDropDatabase(DropDatabaseEvent dbEvent) throws MetaException {
+ // Subscriber can get notification about drop of a database in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_DATABASE"
+ if (dbEvent.getStatus()) {
+ String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
+ send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
+ }
+ }
+
+ @Override
+ public void onCreateTable(CreateTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about addition of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_ADD_TABLE"
+ if (tableEvent.getStatus()) {
+ Table tbl = tableEvent.getTable();
+ HMSHandler handler = tableEvent.getHandler();
+ HiveConf conf = handler.getHiveConf();
+ Table newTbl;
+ try {
+ newTbl = handler.get_table(tbl.getDbName(), tbl.getTableName())
+ .deepCopy();
+ newTbl.getParameters().put(
+ HCatConstants.HCAT_MSGBUS_TOPIC_NAME,
+ getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase() + "."
+ + newTbl.getTableName().toLowerCase());
+ handler.alter_table(newTbl.getDbName(), newTbl.getTableName(), newTbl);
+ } catch (InvalidOperationException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ } catch (NoSuchObjectException e) {
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ throw me;
+ }
+ String topicName = getTopicPrefix(conf) + "." + newTbl.getDbName().toLowerCase();
+ send(messageFactory.buildCreateTableMessage(newTbl), topicName);
+ }
+ }
+
+ private String getTopicPrefix(HiveConf conf) {
+ return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
+ HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
+ }
+
+ /**
+ * Send dropped table notifications. Subscribers can receive these notifications for
+ * dropped tables by listening on topic "HCAT" with message selector string
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_EVENT} =
+ * {@value org.apache.hcatalog.common.HCatConstants#HCAT_DROP_TABLE_EVENT}
+ * </br>
+ * TODO: DataNucleus 2.0.3, currently used by the HiveMetaStore for persistence, has been
+ * found to throw NPE when serializing objects that contain null. For this reason we override
+ * some fields in the StorageDescriptor of this notification. This should be fixed after
+ * HIVE-2084 "Upgrade datanucleus from 2.0.3 to 3.0.1" is resolved.
+ */
+ @Override
+ public void onDropTable(DropTableEvent tableEvent) throws MetaException {
+ // Subscriber can get notification about drop of a table in HCAT
+ // by listening on a topic named "HCAT" and message selector string
+ // as "HCAT_EVENT = HCAT_DROP_TABLE"
+
+ // Datanucleus throws NPE when we try to serialize a table object
+ // retrieved from metastore. To workaround that we reset following objects
+
+ if (tableEvent.getStatus()) {
+ Table table = tableEvent.getTable();
+ String topicName = getTopicPrefix(tableEvent.getHandler().getHiveConf()) + "." + table.getDbName().toLowerCase();
+ send(messageFactory.buildDropTableMessage(table), topicName);
+ }
+ }
+
+ /**
+ * @param hCatEventMessage The HCatEventMessage being sent over JMS.
+ * @param topicName is the name on message broker on which message is sent.
+ */
+ protected void send(HCatEventMessage hCatEventMessage, String topicName) {
+ try {
+ if(null == session){
+ // this will happen, if we never able to establish a connection.
+ createConnection();
+ if (null == session){
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: " +
+ topicName + " event: " + hCatEventMessage.getEventType());
+ return;
+ }
+ }
+
+ Destination topic = getTopic(topicName);
+
+ if (null == topic){
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: " +
+ topicName + " event: " + hCatEventMessage.getEventType());
+ return;
+ }
+
+ MessageProducer producer = session.createProducer(topic);
+ Message msg = session.createTextMessage(hCatEventMessage.toString());
+
+ msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
+ msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
+ msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+ producer.send(msg);
+ // Message must be transacted before we return.
+ session.commit();
+ }
+ catch(Exception e){
+ // Gobble up the exception. Message delivery is best effort.
+ LOG.error("Failed to send message on topic: " + topicName +
+ " event: " + hCatEventMessage.getEventType(), e);
+ }
+ }
+
+ /**
+ * Get the topic object for the topicName, it also tries to reconnect
+ * if the connection appears to be broken.
+ *
+ * @param topicName The String identifying the message-topic.
+ * @return A {@link Topic} object corresponding to the specified topicName.
+ * @throws JMSException
+ */
+ protected Topic getTopic(final String topicName) throws JMSException {
+ Topic topic;
+ try {
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ topic = session.createTopic(topicName);
+ } catch (IllegalStateException ise) {
+ // this will happen if we were able to establish connection once, but its no longer valid,
+ // ise is thrown, catch it and retry.
+ LOG.error("Seems like connection is lost. Retrying", ise);
+ createConnection();
+ topic = session.createTopic(topicName);
+ }
+ return topic;
+ }
+
+ protected void createConnection() {
+
+ Context jndiCntxt;
+ try {
+ jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
+ .lookup("ConnectionFactory");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ conn.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException jmse) {
+ LOG.error(jmse.toString());
+ }
+ });
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. "
+ + "Please make sure file named 'jndi.properties' is in "
+ + "classpath and contains appropriate key-value pairs.", e);
+ } catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus", e);
+ } catch (Throwable t) {
+ LOG.error("Unable to connect to JMS provider", t);
+ }
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ // Close the connection before dying.
+ try {
+ if (null != session)
+ session.close();
+ if (conn != null) {
+ conn.close();
+ }
+
+ } catch (Exception ignore) {
+ LOG.info("Failed to close message bus connection.", ignore);
+ }
+ }
+
+ @Override
+ public void onLoadPartitionDone(LoadPartitionDoneEvent lpde)
+ throws MetaException {
+// TODO: Fix LoadPartitionDoneEvent. Currently, LPDE can only carry a single partition-spec. And that defeats the purpose.
+// if(lpde.getStatus())
+// send(lpde.getPartitionName(),lpde.getTable().getParameters().get(HCatConstants.HCAT_MSGBUS_TOPIC_NAME),HCatConstants.HCAT_PARTITION_DONE_EVENT);
+ }
+
+ @Override
+ public void onAlterPartition(AlterPartitionEvent ape) throws MetaException {
+ // no-op
+ }
+
+ @Override
+ public void onAlterTable(AlterTableEvent ate) throws MetaException {
+ // no-op
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/AddPartitionMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The HCat message sent when partition(s) are added to a table.
+ */
+public abstract class AddPartitionMessage extends HCatEventMessage {
+
+ protected AddPartitionMessage() {
+ super(EventType.ADD_PARTITION);
+ }
+
+ /**
+ * Getter for name of table (where partitions are added).
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ /**
+ * Getter for list of partitions added.
+ * @return List of maps, where each map identifies values for each partition-key, for every added partition.
+ */
+ public abstract List<Map<String, String>> getPartitions ();
+
+ @Override
+ public HCatEventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ if (getPartitions() == null)
+ throw new IllegalStateException("Partition-list unset.");
+ return super.checkValid();
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateDatabaseMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Database is created in HCatalog.
+ */
+public abstract class CreateDatabaseMessage extends HCatEventMessage {
+
+ protected CreateDatabaseMessage() {
+ super(EventType.CREATE_DATABASE);
+ }
+
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/CreateTableMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a table is created in HCatalog.
+ */
+public abstract class CreateTableMessage extends HCatEventMessage {
+
+ protected CreateTableMessage() {
+ super(EventType.CREATE_TABLE);
+ }
+
+ /**
+ * Getter for the name of table created in HCatalog.
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ @Override
+ public HCatEventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ return super.checkValid();
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropDatabaseMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Database is dropped from HCatalog.
+ */
+public abstract class DropDatabaseMessage extends HCatEventMessage {
+
+ protected DropDatabaseMessage() {
+ super(EventType.DROP_DATABASE);
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropPartitionMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HCat message sent when a partition is dropped in HCatalog.
+ */
+public abstract class DropPartitionMessage extends HCatEventMessage {
+
+ protected DropPartitionMessage() {
+ super(EventType.DROP_PARTITION);
+ }
+
+ public abstract String getTable();
+ public abstract List<Map<String, String>> getPartitions ();
+
+ @Override
+ public HCatEventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ if (getPartitions() == null)
+ throw new IllegalStateException("Partition-list unset.");
+ return super.checkValid();
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/DropTableMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * HCat message sent when a Table is dropped in HCatalog.
+ */
+public abstract class DropTableMessage extends HCatEventMessage {
+
+ protected DropTableMessage() {
+ super(EventType.DROP_TABLE);
+ }
+
+ /**
+ * Getter for the name of the table being dropped.
+ * @return Table-name (String).
+ */
+ public abstract String getTable();
+
+ @Override
+ public HCatEventMessage checkValid() {
+ if (getTable() == null)
+ throw new IllegalStateException("Table name unset.");
+ return super.checkValid();
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/HCatEventMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import org.apache.hcatalog.common.HCatConstants;
+
+/**
+ * Class representing messages emitted when Metastore operations are done.
+ * (E.g. Creation and deletion of databases, tables and partitions.)
+ */
+public abstract class HCatEventMessage {
+
+ /**
+ * Enumeration of all supported types of Metastore operations.
+ */
+ public static enum EventType {
+
+ CREATE_DATABASE(HCatConstants.HCAT_CREATE_DATABASE_EVENT),
+ DROP_DATABASE(HCatConstants.HCAT_DROP_DATABASE_EVENT),
+ CREATE_TABLE(HCatConstants.HCAT_CREATE_TABLE_EVENT),
+ DROP_TABLE(HCatConstants.HCAT_DROP_TABLE_EVENT),
+ ADD_PARTITION(HCatConstants.HCAT_ADD_PARTITION_EVENT),
+ DROP_PARTITION(HCatConstants.HCAT_DROP_PARTITION_EVENT);
+
+ private String typeString;
+
+ EventType(String typeString) {
+ this.typeString = typeString;
+ }
+
+ @Override
+ public String toString() { return typeString; }
+ }
+
+ protected EventType eventType;
+
+ protected HCatEventMessage(EventType eventType) {
+ this.eventType = eventType;
+ }
+
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ /**
+ * Getter for HCatalog Server's URL.
+ * (This is where the event originates from.)
+ * @return HCatalog Server's URL (String).
+ */
+ public abstract String getServer();
+
+ /**
+ * Getter for the Kerberos principal of the HCatalog service.
+ * @return HCatalog Service Principal (String).
+ */
+ public abstract String getServicePrincipal();
+
+ /**
+ * Getter for the name of the Database on which the Metastore operation is done.
+ * @return Database-name (String).
+ */
+ public abstract String getDB();
+
+ /**
+ * Getter for the timestamp associated with the operation.
+ * @return Timestamp (Long - seconds since epoch).
+ */
+ public abstract Long getTimestamp();
+
+ /**
+ * Class invariant. Checked after construction or deserialization.
+ */
+ public HCatEventMessage checkValid() {
+ if (getServer() == null || getServicePrincipal() == null)
+ throw new IllegalStateException("Server-URL/Service-Principal shouldn't be null.");
+ if (getEventType() == null)
+ throw new IllegalStateException("Event-type unset.");
+ if (getDB() == null)
+ throw new IllegalArgumentException("DB-name unset.");
+
+ return this;
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageDeserializer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+/**
+ * Interface for converting HCat events from String-form back to HCatEventMessage instances.
+ */
+public abstract class MessageDeserializer {
+
+ /**
+ * Method to construct HCatEventMessage from string.
+ */
+ public HCatEventMessage getHCatEventMessage(String eventTypeString, String messageBody) {
+
+ switch (HCatEventMessage.EventType.valueOf(eventTypeString)) {
+ case CREATE_DATABASE:
+ return getCreateDatabaseMessage(messageBody);
+ case DROP_DATABASE:
+ return getDropDatabaseMessage(messageBody);
+ case CREATE_TABLE:
+ return getCreateTableMessage(messageBody);
+ case DROP_TABLE:
+ return getDropTableMessage(messageBody);
+ case ADD_PARTITION:
+ return getAddPartitionMessage(messageBody);
+ case DROP_PARTITION:
+ return getDropPartitionMessage(messageBody);
+
+ default:
+ throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
+ }
+ }
+
+ /**
+ * Method to de-serialize CreateDatabaseMessage instance.
+ */
+ public abstract CreateDatabaseMessage getCreateDatabaseMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropDatabaseMessage instance.
+ */
+ public abstract DropDatabaseMessage getDropDatabaseMessage(String messageBody);
+
+ /**
+ * Method to de-serialize CreateTableMessage instance.
+ */
+ public abstract CreateTableMessage getCreateTableMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropTableMessage instance.
+ */
+ public abstract DropTableMessage getDropTableMessage(String messageBody);
+
+ /**
+ * Method to de-serialize AddPartitionMessage instance.
+ */
+ public abstract AddPartitionMessage getAddPartitionMessage(String messageBody);
+
+ /**
+ * Method to de-serialize DropPartitionMessage instance.
+ */
+ public abstract DropPartitionMessage getDropPartitionMessage(String messageBody);
+
+ // Protection against construction.
+ protected MessageDeserializer() {}
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/MessageFactory.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.messaging.json.JSONMessageFactory;
+
+/**
+ * Abstract Factory for the construction of HCatalog message instances.
+ */
+public abstract class MessageFactory {
+
+ private static MessageFactory instance = new JSONMessageFactory();
+
+ protected static final HiveConf hiveConf = new HiveConf();
+ static {
+ hiveConf.addResource("hive-site.xml");
+ }
+
+ private static final String CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX = "hcatalog.message.factory.impl.";
+ private static final String CONF_LABEL_HCAT_MESSAGE_FORMAT = "hcatalog.message.format";
+ private static final String HCAT_MESSAGE_FORMAT = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FORMAT, "json");
+ private static final String DEFAULT_MESSAGE_FACTORY_IMPL = "org.apache.hcatalog.messaging.json.JSONMessageFactory";
+ private static final String HCAT_MESSAGE_FACTORY_IMPL = hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX
+ + HCAT_MESSAGE_FORMAT,
+ DEFAULT_MESSAGE_FACTORY_IMPL);
+
+ protected static final String HCAT_SERVER_URL = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.name(), "");
+ protected static final String HCAT_SERVICE_PRINCIPAL = hiveConf.get(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.name(), "");
+
+ /**
+ * Getter for MessageFactory instance.
+ */
+ public static MessageFactory getInstance() {
+ if (instance == null) {
+ instance = getInstance(HCAT_MESSAGE_FACTORY_IMPL);
+ }
+ return instance;
+ }
+
+ private static MessageFactory getInstance(String className) {
+ try {
+ return (MessageFactory)ReflectionUtils.newInstance(Class.forName(className), hiveConf);
+ }
+ catch (ClassNotFoundException classNotFound) {
+ throw new IllegalStateException("Could not construct MessageFactory implementation: ", classNotFound);
+ }
+ }
+
+ /**
+ * Getter for MessageDeserializer, corresponding to the specified format and version.
+ * @param format Serialization format for notifications.
+ * @param version Version of serialization format (currently ignored.)
+ * @return MessageDeserializer.
+ */
+ public static MessageDeserializer getDeserializer(String format,
+ String version) {
+ return getInstance(hiveConf.get(CONF_LABEL_HCAT_MESSAGE_FACTORY_IMPL_PREFIX + format,
+ DEFAULT_MESSAGE_FACTORY_IMPL)).getDeserializer();
+ }
+
+ public abstract MessageDeserializer getDeserializer();
+
+ /**
+ * Getter for version-string, corresponding to all constructed messages.
+ */
+ public abstract String getVersion();
+
+ /**
+ * Getter for message-format.
+ */
+ public abstract String getMessageFormat();
+
+ /**
+ * Factory method for CreateDatabaseMessage.
+ * @param db The Database being added.
+ * @return CreateDatabaseMessage instance.
+ */
+ public abstract CreateDatabaseMessage buildCreateDatabaseMessage(Database db);
+
+ /**
+ * Factory method for DropDatabaseMessage.
+ * @param db The Database being dropped.
+ * @return DropDatabaseMessage instance.
+ */
+ public abstract DropDatabaseMessage buildDropDatabaseMessage(Database db);
+
+ /**
+ * Factory method for CreateTableMessage.
+ * @param table The Table being created.
+ * @return CreateTableMessage instance.
+ */
+ public abstract CreateTableMessage buildCreateTableMessage(Table table);
+
+ /**
+ * Factory method for DropTableMessage.
+ * @param table The Table being dropped.
+ * @return DropTableMessage instance.
+ */
+ public abstract DropTableMessage buildDropTableMessage(Table table);
+
+ /**
+ * Factory method for AddPartitionMessage.
+ * @param table The Table to which the partition is added.
+ * @param partition The Partition being added.
+ * @return AddPartitionMessage instance.
+ */
+ public abstract AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition);
+
+ /**
+ * Factory method for DropPartitionMessage.
+ * @param table The Table from which the partition is dropped.
+ * @param partition The Partition being dropped.
+ * @return DropPartitionMessage instance.
+ */
+ public abstract DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition);
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/jms/MessagingUtils.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.jms;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.MessageFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+
+/**
+ * Helper Utility to assist consumers of HCat Messages in extracting
+ * message-content from JMS messages.
+ */
+public class MessagingUtils {
+
+ /**
+ * Method to return HCatEventMessage contained in the JMS message.
+ * @param message The JMS Message instance
+ * @return The contained HCatEventMessage
+ */
+ public static HCatEventMessage getMessage(Message message) {
+ try {
+ String messageBody = ((TextMessage)message).getText();
+ String eventType = message.getStringProperty(HCatConstants.HCAT_EVENT);
+ String messageVersion = message.getStringProperty(HCatConstants.HCAT_MESSAGE_VERSION);
+ String messageFormat = message.getStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT);
+
+ if (StringUtils.isEmpty(messageBody) || StringUtils.isEmpty(eventType))
+ throw new IllegalArgumentException("Could not extract HCatEventMessage. " +
+ "EventType and/or MessageBody is null/empty.");
+
+ return MessageFactory.getDeserializer(messageFormat, messageVersion).getHCatEventMessage(eventType, messageBody);
+ }
+ catch (JMSException exception) {
+ throw new IllegalArgumentException("Could not extract HCatEventMessage. ", exception);
+ }
+ }
+
+ // Prevent construction.
+ private MessagingUtils() {}
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONAddPartitionMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON implementation of AddPartitionMessage.
+ */
+public class JSONAddPartitionMessage extends AddPartitionMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, table;
+
+ @JsonProperty
+ Long timestamp;
+
+ @JsonProperty
+ List<Map<String, String>> partitions;
+
+ /**
+ * Default Constructor. Required for Jackson.
+ */
+ public JSONAddPartitionMessage() {}
+
+ public JSONAddPartitionMessage(String server, String servicePrincipal, String db, String table,
+ List<Map<String,String>> partitions, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = db;
+ this.table = table;
+ this.partitions = partitions;
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getTable() { return table; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ @Override
+ public List<Map<String, String>> getPartitions () { return partitions; }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateDatabaseMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON Implementation of CreateDatabaseMessage.
+ */
+public class JSONCreateDatabaseMessage extends CreateDatabaseMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONCreateDatabaseMessage() {}
+
+ public JSONCreateDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = db;
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONCreateTableMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of CreateTableMessage.
+ */
+public class JSONCreateTableMessage extends CreateTableMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, table;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, needed for Jackson.
+ */
+ public JSONCreateTableMessage() {}
+
+ public JSONCreateTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = db;
+ this.table = table;
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ @Override
+ public String getTable() { return table; }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropDatabaseMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of DropDatabaseMessage.
+ */
+public class JSONDropDatabaseMessage extends DropDatabaseMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, required for Jackson.
+ */
+ public JSONDropDatabaseMessage() {}
+
+ public JSONDropDatabaseMessage(String server, String servicePrincipal, String db, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = db;
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropPartitionMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON implementation of DropPartitionMessage.
+ */
+public class JSONDropPartitionMessage extends DropPartitionMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, table;
+
+ @JsonProperty
+ Long timestamp;
+
+ @JsonProperty
+ List<Map<String, String>> partitions;
+
+ /**
+ * Default Constructor. Required for Jackson.
+ */
+ public JSONDropPartitionMessage() {}
+
+ public JSONDropPartitionMessage(String server, String servicePrincipal, String db, String table,
+ List<Map<String,String>> partitions, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = db;
+ this.table = table;
+ this.partitions = partitions;
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public String getTable() { return table; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ @Override
+ public List<Map<String, String>> getPartitions () { return partitions; }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONDropTableMessage.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * JSON implementation of DropTableMessage.
+ */
+public class JSONDropTableMessage extends DropTableMessage {
+
+ @JsonProperty
+ String server, servicePrincipal, db, table;
+
+ @JsonProperty
+ Long timestamp;
+
+ /**
+ * Default constructor, needed for Jackson.
+ */
+ public JSONDropTableMessage() {}
+
+ public JSONDropTableMessage(String server, String servicePrincipal, String db, String table, Long timestamp) {
+ this.server = server;
+ this.servicePrincipal = servicePrincipal;
+ this.db = db;
+ this.table = table;
+ this.timestamp = timestamp;
+ checkValid();
+ }
+
+
+ @Override
+ public String getTable() { return table; }
+
+ @Override
+ public String getServer() { return server; }
+
+ @Override
+ public String getServicePrincipal() { return servicePrincipal; }
+
+ @Override
+ public String getDB() { return db; }
+
+ @Override
+ public Long getTimestamp() { return timestamp; }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONMessageDeserializer.mapper.writeValueAsString(this);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not serialize: ", exception);
+ }
+ }
+
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageDeserializer.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.codehaus.jackson.map.DeserializationConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * MessageDeserializer implementation, for deserializing from JSON strings.
+ */
+public class JSONMessageDeserializer extends MessageDeserializer {
+
+ static ObjectMapper mapper = new ObjectMapper(); // Thread-safe.
+
+ static {
+ mapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ @Override
+ public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONCreateDatabaseMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONCreateDatabaseMessage.", exception);
+ }
+ }
+
+ @Override
+ public DropDatabaseMessage getDropDatabaseMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONDropDatabaseMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONDropDatabaseMessage.", exception);
+ }
+ }
+
+ @Override
+ public CreateTableMessage getCreateTableMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONCreateTableMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONCreateTableMessage.", exception);
+ }
+ }
+
+ @Override
+ public DropTableMessage getDropTableMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONDropTableMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct JSONDropTableMessage.", exception);
+ }
+ }
+
+ @Override
+ public AddPartitionMessage getAddPartitionMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONAddPartitionMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct AddPartitionMessage.", exception);
+ }
+ }
+
+ @Override
+ public DropPartitionMessage getDropPartitionMessage(String messageBody) {
+ try {
+ return mapper.readValue(messageBody, JSONDropPartitionMessage.class);
+ }
+ catch (Exception exception) {
+ throw new IllegalArgumentException("Could not construct DropPartitionMessage.", exception);
+ }
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/messaging/json/JSONMessageFactory.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.messaging.json;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.messaging.AddPartitionMessage;
+import org.apache.hcatalog.messaging.CreateDatabaseMessage;
+import org.apache.hcatalog.messaging.CreateTableMessage;
+import org.apache.hcatalog.messaging.DropDatabaseMessage;
+import org.apache.hcatalog.messaging.DropPartitionMessage;
+import org.apache.hcatalog.messaging.DropTableMessage;
+import org.apache.hcatalog.messaging.MessageDeserializer;
+import org.apache.hcatalog.messaging.MessageFactory;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * The JSON implementation of the MessageFactory. Constructs JSON implementations of
+ * each message-type.
+ */
+public class JSONMessageFactory extends MessageFactory {
+
+ private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer();
+
+ @Override
+ public MessageDeserializer getDeserializer() {
+ return deserializer;
+ }
+
+ @Override
+ public String getVersion() {
+ return "0.1";
+ }
+
+ @Override
+ public String getMessageFormat() {
+ return "json";
+ }
+
+ @Override
+ public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) {
+ return new JSONCreateDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(),
+ System.currentTimeMillis() / 1000);
+ }
+
+ @Override
+ public DropDatabaseMessage buildDropDatabaseMessage(Database db) {
+ return new JSONDropDatabaseMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, db.getName(),
+ System.currentTimeMillis() / 1000);
+ }
+
+ @Override
+ public CreateTableMessage buildCreateTableMessage(Table table) {
+ return new JSONCreateTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(),
+ table.getTableName(), System.currentTimeMillis()/1000);
+ }
+
+ @Override
+ public DropTableMessage buildDropTableMessage(Table table) {
+ return new JSONDropTableMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, table.getDbName(), table.getTableName(),
+ System.currentTimeMillis()/1000);
+ }
+
+ @Override
+ public AddPartitionMessage buildAddPartitionMessage(Table table, Partition partition) {
+ return new JSONAddPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
+ partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
+ System.currentTimeMillis()/1000);
+ }
+
+ @Override
+ public DropPartitionMessage buildDropPartitionMessage(Table table, Partition partition) {
+ return new JSONDropPartitionMessage(HCAT_SERVER_URL, HCAT_SERVICE_PRINCIPAL, partition.getDbName(),
+ partition.getTableName(), Arrays.asList(getPartitionKeyValues(table, partition)),
+ System.currentTimeMillis()/1000);
+ }
+
+ private static Map<String, String> getPartitionKeyValues(Table table, Partition partition) {
+ Map<String, String> partitionKeys = new LinkedHashMap<String, String>();
+ for (int i=0; i<table.getPartitionKeysSize(); ++i)
+ partitionKeys.put(table.getPartitionKeys().get(i).getName(),
+ partition.getValues().get(i));
+ return partitionKeys;
+ }
+}
Added: hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1520466&view=auto
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java (added)
+++ hive/trunk/hcatalog/server-extensions/src/test/java/org/apache/hcatalog/listener/TestMsgBusConnection.java Fri Sep 6 00:49:14 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.messaging.HCatEventMessage;
+import org.apache.hcatalog.messaging.jms.MessagingUtils;
+
+public class TestMsgBusConnection extends TestCase {
+
+ private Driver driver;
+ private BrokerService broker;
+ private MessageConsumer consumer;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ broker = new BrokerService();
+ // configure the broker
+ broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+ broker.start();
+
+ System.setProperty("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+ connectClient();
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hiveConf.set(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX, "planetlab.hcat");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ private void connectClient() throws JMSException {
+ ConnectionFactory connFac = new ActiveMQConnectionFactory(
+ "tcp://localhost:61616");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic("planetlab.hcat");
+ consumer = session.createConsumer(hcatTopic);
+ }
+
+ public void testConnection() throws Exception {
+
+ try {
+ driver.run("create database testconndb");
+ Message msg = consumer.receive();
+ assertTrue("Expected TextMessage", msg instanceof TextMessage);
+ assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ HCatEventMessage messageObject = MessagingUtils.getMessage(msg);
+ assertEquals("testconndb", messageObject.getDB());
+ broker.stop();
+ driver.run("drop database testconndb cascade");
+ broker.start(true);
+ connectClient();
+ driver.run("create database testconndb");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_CREATE_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb", messageObject.getDB());
+ driver.run("drop database testconndb cascade");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT,
+ msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://planetlab.hcat", msg.getJMSDestination().toString());
+ assertEquals("testconndb", messageObject.getDB());
+ } catch (NoSuchObjectException nsoe) {
+ nsoe.printStackTrace(System.err);
+ assert false;
+ } catch (AlreadyExistsException aee) {
+ aee.printStackTrace(System.err);
+ assert false;
+ }
+ }
+}