You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mi...@apache.org on 2013/05/08 01:14:13 UTC

svn commit: r1480128 - /hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java

Author: mithun
Date: Tue May  7 23:14:12 2013
New Revision: 1480128

URL: http://svn.apache.org/r1480128
Log:
HCATALOG-627 - Adding thread-safety to NotificationListener. (amalakar via mithun)

Modified:
    hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java

Modified: hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java?rev=1480128&r1=1480127&r2=1480128&view=diff
==============================================================================
--- hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java (original)
+++ hive/trunk/hcatalog/server-extensions/src/main/java/org/apache/hcatalog/listener/NotificationListener.java Tue May  7 23:14:12 2013
@@ -24,9 +24,9 @@ import java.util.HashMap;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
-import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageProducer;
@@ -76,17 +76,42 @@ import org.slf4j.LoggerFactory;
 public class NotificationListener extends MetaStoreEventListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NotificationListener.class);
-    protected Session session;
     protected Connection conn;
     private static MessageFactory messageFactory = MessageFactory.getInstance();
+    public static final int NUM_RETRIES = 1;
+    private static final String HEALTH_CHECK_TOPIC_SUFFIX = "jms_health_check";
+    private static final String HEALTH_CHECK_MSG = "HCAT_JMS_HEALTH_CHECK_MESSAGE";
+
+    protected final ThreadLocal<Session> session = new ThreadLocal<Session>() {
+        @Override
+        protected Session initialValue() {
+            try {
+                return createSession();
+            } catch (Exception e) {
+                LOG.error("Couldn't create JMS Session", e);
+                return null;
+            }
+        }
+
+        @Override
+        public void remove() {
+            if (get() != null) {
+                try {
+                    get().close();
+                } catch (Exception e) {
+                    LOG.error("Unable to close bad JMS session, ignored error", e);
+                }
+            }
+            super.remove();
+        }
+    };
 
     /**
      * Create message bus connection and session in constructor.
      */
     public NotificationListener(final Configuration conf) {
-
         super(conf);
-        createConnection();
+        testAndCreateConnection();
     }
 
     private static String getTopicName(Partition partition,
@@ -178,7 +203,7 @@ public class NotificationListener extend
         // Subscriber can get notification about drop of a database in HCAT
         // by listening on a topic named "HCAT" and message selector string
         // as "HCAT_EVENT = HCAT_DROP_DATABASE"
-        if (dbEvent.getStatus())  {
+        if (dbEvent.getStatus()) {
             String topicName = getTopicPrefix(dbEvent.getHandler().getHiveConf());
             send(messageFactory.buildDropDatabaseMessage(dbEvent.getDatabase()), topicName);
         }
@@ -216,7 +241,7 @@ public class NotificationListener extend
         }
     }
 
-    private String getTopicPrefix(HiveConf conf) {
+    private String getTopicPrefix(Configuration conf) {
         return conf.get(HCatConstants.HCAT_MSGBUS_TOPIC_PREFIX,
             HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX);
     }
@@ -253,86 +278,102 @@ public class NotificationListener extend
      * @param topicName is the name on message broker on which message is sent.
      */
     protected void send(HCatEventMessage hCatEventMessage, String topicName) {
-        try {
-            if(null == session){
-                // this will happen, if we never able to establish a connection.
-                createConnection();
-                if (null == session){
-                    // Still not successful, return from here.
-                    LOG.error("Invalid session. Failed to send message on topic: " +
-                            topicName + " event: " + hCatEventMessage.getEventType());
-                    return;
-                }
-            }
-
-            Destination topic = getTopic(topicName);
+        send(hCatEventMessage, topicName, NUM_RETRIES);
+    }
 
-            if (null == topic){
-                // Still not successful, return from here.
-                LOG.error("Invalid session. Failed to send message on topic: " +
-                        topicName + " event: " + hCatEventMessage.getEventType());
-                return;
+    /**
+     * @param hCatEventMessage The HCatEventMessage being sent over JMS, this method is threadsafe
+     * @param topicName is the name on message broker on which message is sent.
+     * @param retries the number of retry attempts
+     */
+    protected void send(HCatEventMessage hCatEventMessage, String topicName, int retries) {
+        try {
+            if (session.get() == null) {
+                // Need to reconnect
+                throw new JMSException("Invalid JMS session");
             }
-
-            MessageProducer producer = session.createProducer(topic);
-            Message msg = session.createTextMessage(hCatEventMessage.toString());
+            Destination topic = createTopic(topicName);
+            Message msg = session.get().createTextMessage(hCatEventMessage.toString());
 
             msg.setStringProperty(HCatConstants.HCAT_EVENT, hCatEventMessage.getEventType().toString());
             msg.setStringProperty(HCatConstants.HCAT_MESSAGE_VERSION, messageFactory.getVersion());
             msg.setStringProperty(HCatConstants.HCAT_MESSAGE_FORMAT, messageFactory.getMessageFormat());
+            MessageProducer producer = createProducer(topic);
             producer.send(msg);
             // Message must be transacted before we return.
-            session.commit();
-        }
-        catch(Exception e){
-            // Gobble up the exception. Message delivery is best effort.
-            LOG.error("Failed to send message on topic: " + topicName +
-                    " event: " + hCatEventMessage.getEventType(), e);
+            session.get().commit();
+        } catch (Exception e) {
+            if (retries >= 0) {
+                // this may happen if we were able to establish connection once, but its no longer valid
+                LOG.error("Seems like connection is lost. Will retry. Retries left : " + retries + ". error was:", e);
+                testAndCreateConnection();
+                send(hCatEventMessage, topicName, retries - 1);
+            } else {
+                // Gobble up the exception. Message delivery is best effort.
+                LOG.error("Failed to send message on topic: " + topicName +
+                    " event: " + hCatEventMessage.getEventType() + " after retries: " + NUM_RETRIES, e);
+            }
         }
     }
 
     /**
-     * Get the topic object for the topicName, it also tries to reconnect
-     * if the connection appears to be broken.
+     * Get the topic object for the topicName
      *
      * @param topicName The String identifying the message-topic.
      * @return A {@link Topic} object corresponding to the specified topicName.
      * @throws JMSException
      */
-    protected Topic getTopic(final String topicName) throws JMSException {
-        Topic topic;
+    protected Topic createTopic(final String topicName) throws JMSException {
+        return session.get().createTopic(topicName);
+    }
+
+    /**
+     * Does a health check on the connection by sending a dummy message.
+     * Create the connection if the connection is found to be bad
+     * Also recreates the session
+     */
+    protected synchronized void testAndCreateConnection() {
+        if (conn != null) {
+            // This method is reached when error occurs while sending msg, so the session must be bad
+            session.remove();
+            if (!isConnectionHealthy()) {
+                // I am the first thread to detect the error, cleanup old connection & reconnect
+                try {
+                    conn.close();
+                } catch (Exception e) {
+                    LOG.error("Unable to close bad JMS connection, ignored error", e);
+                }
+                conn = createConnection();
+            }
+        } else {
+            conn = createConnection();
+        }
         try {
-            // Topics are created on demand. If it doesn't exist on broker it will
-            // be created when broker receives this message.
-            topic = session.createTopic(topicName);
-        } catch (IllegalStateException ise) {
-            // this will happen if we were able to establish connection once, but its no longer valid,
-            // ise is thrown, catch it and retry.
-            LOG.error("Seems like connection is lost. Retrying", ise);
-            createConnection();
-            topic = session.createTopic(topicName);
+            session.set(createSession());
+        } catch (JMSException e) {
+            LOG.error("Couldn't create JMS session, ignored the error", e);
         }
-        return topic;
     }
 
-    protected void createConnection() {
-
+    /**
+     * Create the JMS connection
+     * @return newly created JMS connection
+     */
+    protected Connection createConnection() {
+        LOG.info("Will create new JMS connection");
         Context jndiCntxt;
+        Connection jmsConnection = null;
         try {
             jndiCntxt = new InitialContext();
-            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt
-                .lookup("ConnectionFactory");
-            Connection conn = connFac.createConnection();
-            conn.start();
-            conn.setExceptionListener(new ExceptionListener() {
+            ConnectionFactory connFac = (ConnectionFactory) jndiCntxt.lookup("ConnectionFactory");
+            jmsConnection = connFac.createConnection();
+            jmsConnection.start();
+            jmsConnection.setExceptionListener(new ExceptionListener() {
                 @Override
                 public void onException(JMSException jmse) {
-                    LOG.error(jmse.toString());
+                    LOG.error("JMS Exception listener received exception. Ignored the error", jmse);
                 }
             });
-            // We want message to be sent when session commits, thus we run in
-            // transacted mode.
-            session = conn.createSession(true, Session.SESSION_TRANSACTED);
         } catch (NamingException e) {
             LOG.error("JNDI error while setting up Message Bus connection. "
                 + "Please make sure file named 'jndi.properties' is in "
@@ -342,20 +383,54 @@ public class NotificationListener extend
         } catch (Throwable t) {
             LOG.error("Unable to connect to JMS provider", t);
         }
+        return jmsConnection;
+    }
+
+    /**
+     * Send a dummy message to probe if the JMS connection is healthy
+     * @return true if connection is healthy, false otherwise
+     */
+    protected boolean isConnectionHealthy() {
+        try {
+            Topic topic = createTopic(getTopicPrefix(getConf()) + "." + HEALTH_CHECK_TOPIC_SUFFIX);
+            MessageProducer producer = createProducer(topic);
+            Message msg = session.get().createTextMessage(HEALTH_CHECK_MSG);
+            producer.send(msg, DeliveryMode.NON_PERSISTENT, 4, 0);
+        } catch (Exception e) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Creates a JMS session
+     * @return newly create JMS session
+     * @throws JMSException
+     */
+    protected Session createSession() throws JMSException {
+        // We want message to be sent when session commits, thus we run in
+        // transacted mode.
+        return conn.createSession(true, Session.SESSION_TRANSACTED);
+    }
+
+    /**
+     * Create a JMS producer
+     * @param topic
+     * @return newly created message producer
+     * @throws JMSException
+     */
+    protected MessageProducer createProducer(Destination topic) throws JMSException {
+        return session.get().createProducer(topic);
     }
 
     @Override
     protected void finalize() throws Throwable {
-        // Close the connection before dying.
-        try {
-            if (null != session)
-                session.close();
-            if (conn != null) {
+        if (conn != null) {
+            try {
                 conn.close();
+            } catch (Exception e) {
+                LOG.error("Couldn't close jms connection, ignored the error", e);
             }
-
-        } catch (Exception ignore) {
-            LOG.info("Failed to close message bus connection.", ignore);
         }
     }