You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by ha...@apache.org on 2011/06/14 21:16:48 UTC

svn commit: r1135798 - in /incubator/hcatalog/trunk: CHANGES.txt ivy.xml src/java/org/apache/hcatalog/listener/NotificationListener.java src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java

Author: hashutosh
Date: Tue Jun 14 21:16:47 2011
New Revision: 1135798

URL: http://svn.apache.org/viewvc?rev=1135798&view=rev
Log:
HCATALOG-39: Lazily create connection for Message bus

Added:
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/ivy.xml
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Jun 14 21:16:47 2011
@@ -12,6 +12,9 @@ Trunk (unreleased changes)
     (Krishna Kumar via macyang)
     
   IMPROVEMENTS
+
+    HCAT-39. Lazily create connection for Message bus (hashutosh)
+
     HCAT-44. Add a releaseaudit target to build.xml (gates)
 
     HCAT-40. Remove dependencies from the HCatalog client jar (macyang)

Modified: incubator/hcatalog/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/ivy.xml?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/ivy.xml (original)
+++ incubator/hcatalog/trunk/ivy.xml Tue Jun 14 21:16:47 2011
@@ -45,7 +45,7 @@
           conf="common->master"/>
           -->
         <dependency org="javax.jms" name="jms" rev="${jms.version}" conf="common->master" />
-        <dependency org="org.apache.activemq" name="activemq-core" rev="${activemq.version}" conf="common->master" />
+        <dependency org="org.apache.activemq" name="activemq-all" rev="${activemq.version}" conf="common->master" />
         <dependency org="javax.management.j2ee" name="management-api" rev="${javax-mgmt.version}" conf="common->master" /> 
         <dependency org="com.google.code.p.arat" name="rat-lib" rev="${rats-lib.version}" conf="releaseaudit->default"/>
 </dependencies>

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1135798&r1=1135797&r2=1135798&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/listener/NotificationListener.java Tue Jun 14 21:16:47 2011
@@ -22,11 +22,11 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 
-import javax.jdo.PersistenceManager;
-import javax.jdo.Query;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
@@ -35,16 +35,11 @@ import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
-import org.apache.hadoop.hive.metastore.ObjectStore;
-import org.apache.hadoop.hive.metastore.RawStore;
-import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler.Command;
-import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
@@ -82,26 +77,7 @@ public class NotificationListener extend
 	public NotificationListener(final Configuration conf) {
 
 		super(conf);
-		try {
-			Context jndiCntxt = new InitialContext();
-			ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
-			conn = connFac.createConnection();
-			conn.start();
-			// We want message to be sent when session commits, thus we run in
-			// transacted mode.
-			session = conn.createSession(true, Session.SESSION_TRANSACTED);
-
-		} catch (NamingException e) {
-			LOG.error("JNDI error while setting up Message Bus connection. " +
-					"Please make sure file named 'jndi.properties' is in " +
-					"classpath and contains appropriate key-value pairs.",e);
-		}
-		catch (JMSException e) {
-			LOG.error("Failed to initialize connection to message bus",e);
-		}
-		catch(Throwable t){
-			LOG.error("HCAT Listener failed to load",t);
-		}
+		createConnection();
 	}
 
 	@Override
@@ -227,18 +203,36 @@ public class NotificationListener extend
 	 */
 	private void send(Serializable msgBody, String topicName, String event){
 
-		if(null == session){
-			// If we weren't able to setup the session in the constructor
-			// we cant send message in any case.
-			LOG.error("Invalid session. Failed to send message on topic: "+
-					topicName + " event: "+event);
-			return;
-		}
-
 		try{
-			// Topics are created on demand. If it doesn't exist on broker it will
-			// be created when broker receives this message.
-			Destination topic = session.createTopic(topicName);
+
+			Destination topic = null;
+			if(null == session){
+				// this will happen, if we never able to establish a connection.
+				createConnection();
+				if (null == session){
+					// Still not successful, return from here.
+					LOG.error("Invalid session. Failed to send message on topic: "+
+							topicName + " event: "+event);				
+					return;
+				}
+			}
+			try{
+				// Topics are created on demand. If it doesn't exist on broker it will
+				// be created when broker receives this message.
+				topic = session.createTopic(topicName);				
+			} catch (IllegalStateException ise){
+				// this will happen if we were able to establish connection once, but its no longer valid,
+				// ise is thrown, catch it and retry.
+				LOG.error("Seems like connection is lost. Retrying", ise);
+				createConnection();
+				topic = session.createTopic(topicName);				
+			}
+			if (null == topic){
+				// Still not successful, return from here.
+				LOG.error("Invalid session. Failed to send message on topic: "+
+						topicName + " event: "+event);				
+				return;
+			}
 			MessageProducer producer = session.createProducer(topic);
 			ObjectMessage msg = session.createObjectMessage(msgBody);
 			msg.setStringProperty(HCatConstants.HCAT_EVENT, event);
@@ -252,13 +246,44 @@ public class NotificationListener extend
 		}
 	}
 
+	private void createConnection(){
+
+		Context jndiCntxt;
+		try {
+			jndiCntxt = new InitialContext();
+			ConnectionFactory connFac = (ConnectionFactory)jndiCntxt.lookup("ConnectionFactory");
+			Connection conn = connFac.createConnection();
+			conn.start();
+			conn.setExceptionListener(new ExceptionListener() {
+				@Override
+				public void onException(JMSException jmse) {
+						LOG.error(jmse);
+				}
+			});
+			// We want message to be sent when session commits, thus we run in
+			// transacted mode.
+			session = conn.createSession(true, Session.SESSION_TRANSACTED);
+		} catch (NamingException e) {
+			LOG.error("JNDI error while setting up Message Bus connection. " +
+					"Please make sure file named 'jndi.properties' is in " +
+					"classpath and contains appropriate key-value pairs.",e);
+		} catch (JMSException e) {
+			LOG.error("Failed to initialize connection to message bus",e);
+		} catch(Throwable t){
+			LOG.error("Unable to connect to JMS provider",t);
+		}
+	}
+
 	@Override
 	protected void finalize() throws Throwable {
 		// Close the connection before dying.
 		try {
+			if (null != session)
+				session.close();
 			if(conn != null) {
 				conn.close();
 			}
+			
 		} catch (Exception ignore) {
 			LOG.info("Failed to close message bus connection.", ignore);
 		}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java?rev=1135798&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/listener/TestMsgBusConnection.java Tue Jun 14 21:16:47 2011
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.listener;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.common.HCatConstants;
+
+public class TestMsgBusConnection extends TestCase{
+
+	private Driver driver;
+	private BrokerService broker;
+	private MessageConsumer consumer;
+
+	@Override
+	protected void setUp() throws Exception {
+
+		super.setUp();
+		broker = new BrokerService();
+		// configure the broker
+		broker.addConnector("tcp://localhost:61616?broker.persistent=false");
+
+		broker.start();
+
+		System.setProperty("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+		System.setProperty("java.naming.provider.url", "tcp://localhost:61616");
+		connectClient();
+		HiveConf hiveConf = new HiveConf(this.getClass());
+		hiveConf.set(ConfVars.METASTORE_EVENT_LISTENERS.varname,NotificationListener.class.getName());
+		hiveConf.set("hive.metastore.local", "true");
+		hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+		hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+		hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+		SessionState.start(new CliSessionState(hiveConf));
+		driver = new Driver(hiveConf);
+	}
+
+	private void connectClient() throws JMSException{
+		ConnectionFactory connFac = new ActiveMQConnectionFactory("tcp://localhost:61616");
+		Connection conn = connFac.createConnection();
+		conn.start();
+		Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+		Destination hcatTopic = session.createTopic(HCatConstants.HCAT_TOPIC);
+		consumer = session.createConsumer(hcatTopic);
+	}
+
+	public void testConnection() throws Exception{
+
+		try{
+			driver.run("create database testconndb");
+			Message msg = consumer.receive();
+			assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
+			assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+			assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+			broker.stop();
+			driver.run("drop database testconndb cascade");
+			broker.start(true);
+			connectClient();
+			driver.run("create database testconndb");
+			msg = consumer.receive();
+			assertEquals(HCatConstants.HCAT_ADD_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
+			assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+			assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+			driver.run("drop database testconndb cascade");
+			msg = consumer.receive();
+			assertEquals(HCatConstants.HCAT_DROP_DATABASE_EVENT, msg.getStringProperty(HCatConstants.HCAT_EVENT));
+			assertEquals("topic://"+HCatConstants.HCAT_TOPIC,msg.getJMSDestination().toString());
+			assertEquals("testconndb", ((Database) ((ObjectMessage)msg).getObject()).getName());
+		} catch (NoSuchObjectException nsoe){
+			nsoe.printStackTrace(System.err);
+			assert false;
+		} catch (AlreadyExistsException aee){
+			aee.printStackTrace(System.err);
+			assert false;
+		}
+	}
+}