You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/06/14 21:16:48 UTC
svn commit: r1135798 - in /incubator/hcatalog/trunk: CHANGES.txt ivy.xml
src/java/org/apache/hcatalog/listener/NotificationListener.java
src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
Author: hashutosh
Date: Tue Jun 14 21:16:47 2011
New Revision: 1135798
URL: http://svn.apache.org/viewvc?rev=1135798&view=rev
Log:
HCATALOG-39: Lazily create connection for Message bus
Added:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/ivy.xml
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Jun 14 21:16:47 2011
@@ -12,6 +12,9 @@ Trunk (unreleased changes)
(Krishna Kumar via macyang)
IMPROVEMENTS
+
+ HCAT-39. Lazily create connection for Message bus (hashutosh)
+
HCAT-44. Add a releaseaudit target to build.xml (gates)
HCAT-40. Remove dependencies from the HCatalog client jar (macyang)
Modified: incubator/hcatalog/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy.xml?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/ivy.xml (original)
+++ incubator/hcatalog/trunk/ivy.xml Tue Jun 14 21:16:47 2011
@@ -45,7 +45,7 @@
conf="common->master"/>
-->
<dependency org="javax.jms" name="jms" rev="${jms.version}" conf="common->master" />
- <dependency org="org.apache.activemq" name="activemq-core" rev="${activemq.version}" conf="common->master" />
+ <dependency org="org.apache.activemq" name="activemq-all" rev="${activemq.version}" conf="common->master" />
<dependency org="javax.management.j2ee" name="management-api" rev="${javax-mgmt.version}" conf="common->master" />
<dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}" conf="releaseaudit->default"/>
</dependencies>
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Tue Jun 14 21:16:47 2011
@@ -22,11 +22,11 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
-import javax.jdo.PersistenceManager;
-import javax.jdo.Query;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@@ -35,16 +35,11 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
-import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -82,26 +77,7 @@ public class NotificationListener extend
public NotificationListener(final Configuration conf) {
super(conf);
- try {
- Context jndiCntxt = new InitialContext();
- ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
- conn = connFac.createConnection();
- conn.start();
- // We want message to be sent when session commits, thus we run in
- // transacted mode.
- session = conn.createSession(true, Session.SESSION_TRANSACTED);
-
- } catch (NamingException e) {
- LOG.error("JNDI error while setting up Message Bus connection. " +
- "Please make sure file named 'jndi.properties' is in " +
- "classpath and contains appropriate key-value pairs.",e);
- }
- catch (JMSException e) {
- LOG.error("Failed to initialize connection to message bus",e);
- }
- catch(Throwable t){
- LOG.error("HCAT Listener failed to load",t);
- }
+ createConnection();
}
@Override
@@ -227,18 +203,36 @@ public class NotificationListener extend
*/
private void send(Serializable msgBody, String topicName, String event){
- if(null == session){
- // If we weren't able to setup the session in the constructor
- // we cant send message in any case.
- LOG.error("Invalid session. Failed to send message on topic: "+
- topicName + " event: "+event);
- return;
- }
-
try{
- // Topics are created on demand. If it doesn't exist on broker it will
- // be created when broker receives this message.
- Destination topic = session.createTopic(topicName);
+
+ Destination topic = null;
+ if(null == session){
+ // this will happen, if we never able to establish a connection.
+ createConnection();
+ if (null == session){
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "+
+ topicName + " event: "+event);
+ return;
+ }
+ }
+ try{
+ // Topics are created on demand. If it doesn't exist on broker it will
+ // be created when broker receives this message.
+ topic = session.createTopic(topicName);
+ } catch (IllegalStateException ise){
+ // this will happen if we were able to establish connection once, but its no longer valid,
+ // ise is thrown, catch it and retry.
+ LOG.error("Seems like connection is lost. Retrying", ise);
+ createConnection();
+ topic = session.createTopic(topicName);
+ }
+ if (null == topic){
+ // Still not successful, return from here.
+ LOG.error("Invalid session. Failed to send message on topic: "+
+ topicName + " event: "+event);
+ return;
+ }
MessageProducer producer = session.createProducer(topic);
ObjectMessage msg = session.createObjectMessage(msgBody);
msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
@@ -252,13 +246,44 @@ public class NotificationListener extend
}
}
+ private void createConnection(){
+
+ Context jndiCntxt;
+ try {
+ jndiCntxt = new InitialContext();
+ ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ conn.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException jmse) {
+ LOG.error(jmse);
+ }
+ });
+ // We want message to be sent when session commits, thus we run in
+ // transacted mode.
+ session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ } catch (NamingException e) {
+ LOG.error("JNDI error while setting up Message Bus connection. " +
+ "Please make sure file named 'jndi.properties' is in " +
+ "classpath and contains appropriate key-value pairs.",e);
+ } catch (JMSException e) {
+ LOG.error("Failed to initialize connection to message bus",e);
+ } catch(Throwable t){
+ LOG.error("Unable to connect to JMS provider",t);
+ }
+ }
+
@Override
protected void finalize() throws Throwable {
// Close the connection before dying.
try {
+ if (null != session)
+ session.close();
if(conn != null) {
conn.close();
}
+
} catch (Exception ignore) {
LOG.info("Failed to close message bus connection.", ignore);
}
Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1135798&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java Tue Jun 14 21:16:47 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+
+public class TestMsgBusConnection extends TestCase{
+
+ private Driver driver;
+ private BrokerService broker;
+ private MessageConsumer consumer;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+ broker = new BrokerService();
+ // configure the broker
+ broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+ broker.start();
+
+ System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+ connectClient();
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
+ hiveConf.set("hive.metastore.local", "true");
+ hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ SessionState.start(new CliSessionState(hiveConf));
+ driver = new Driver(hiveConf);
+ }
+
+ private void connectClient() throws JMSException{
+ ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ Connection conn = connFac.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC);
+ consumer = session.createConsumer(hcatTopic);
+ }
+
+ public void testConnection() throws Exception{
+
+ try{
+ driver.run("create database testconndb");
+ Message msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ broker.stop();
+ driver.run("drop database testconndb cascade");
+ broker.start(true);
+ connectClient();
+ driver.run("create database testconndb");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ driver.run("drop database testconndb cascade");
+ msg = consumer.receive();
+ assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
+ assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+ assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+ } catch (NoSuchObjectException nsoe){
+ nsoe.printStackTrace(System.err);
+ assert false;
+ } catch (AlreadyExistsException aee){
+ aee.printStackTrace(System.err);
+ assert false;
+ }
+ }
+}