You are viewing a plain text version of this content. The canonical link for it is here.
Posted to scm@geronimo.apache.org by db...@apache.org on 2005/11/17 13:23:46 UTC

svn commit: r345235 - /geronimo/gbuild/trunk/gbuild-agent/src/main/java/org/apache/geronimo/gbuild/agent/MessagingClient.java

Author: dblevins
Date: Thu Nov 17 04:23:40 2005
New Revision: 345235

URL: http://svn.apache.org/viewcvs?rev=345235&view=rev
Log:
This guy doesn't quite work yet, but the idea is that when a connection goes down, we can easily reconnect and get back to work.

Added:
    geronimo/gbuild/trunk/gbuild-agent/src/main/java/org/apache/geronimo/gbuild/agent/MessagingClient.java

Added: geronimo/gbuild/trunk/gbuild-agent/src/main/java/org/apache/geronimo/gbuild/agent/MessagingClient.java
URL: http://svn.apache.org/viewcvs/geronimo/gbuild/trunk/gbuild-agent/src/main/java/org/apache/geronimo/gbuild/agent/MessagingClient.java?rev=345235&view=auto
==============================================================================
--- geronimo/gbuild/trunk/gbuild-agent/src/main/java/org/apache/geronimo/gbuild/agent/MessagingClient.java (added)
+++ geronimo/gbuild/trunk/gbuild-agent/src/main/java/org/apache/geronimo/gbuild/agent/MessagingClient.java Thu Nov 17 04:23:40 2005
@@ -0,0 +1,172 @@
+/**
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.geronimo.gbuild.agent;
+
+import org.codehaus.plexus.logging.Logger;
+import org.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.JMSException;
+import javax.jms.ExceptionListener;
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.jms.ObjectMessage;
+import javax.jms.Message;
+import javax.jms.DeliveryMode;
+import java.net.SocketException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MessagingClient implements ExceptionListener {
+
+    private final HashMap destinations;
+    private final String transportUrl;
+    private final ExceptionListener listener;
+    private final Logger logger;
+    private Session session;
+    private Connection connection;
+    private boolean failed;
+    private int maxAttempts = 10;
+
+    public MessagingClient(String transportUrl, Logger logger, ExceptionListener listener) throws JMSException {
+        this.transportUrl = transportUrl;
+        this.logger = logger;
+        this.destinations = new HashMap();
+        this.listener = listener;
+        connection = connect();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public Logger getLogger() {
+        return logger;
+    }
+
+    private void reset() throws JMSException {
+        getLogger().info("Resetting connection, session and destinations");
+        synchronized (destinations) {
+            connection = connect();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Reset all instances of Queue and Topic
+            for (Iterator iterator = destinations.entrySet().iterator(); iterator.hasNext();) {
+                Map.Entry entry = (Map.Entry) iterator.next();
+                String subject = (String) entry.getKey();
+                Destination destination = (Destination) entry.getValue();
+                if (destination instanceof Queue){
+                    Queue queue = session.createQueue(subject);
+                    destinations.put(subject, queue);
+                } else {
+                    Topic queue = session.createTopic(subject);
+                    destinations.put(subject, queue);
+                }
+            }
+        }
+    }
+
+    public void close() throws JMSException {
+        synchronized (destinations) {
+            session.close();
+            connection.close();
+        }
+    }
+
+    public void addTopic(String subject) throws JMSException {
+        synchronized (destinations) {
+            Topic queue = session.createTopic(subject);
+            destinations.put(subject, queue);
+        }
+    }
+
+    public void addQueue(String subject) throws JMSException {
+        synchronized (destinations) {
+            Queue queue = session.createQueue(subject);
+            destinations.put(subject, queue);
+        }
+    }
+
+    public void send(String subject, Serializable message) throws JMSException {
+        Destination destination = getDestination(subject);
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        ObjectMessage objectMessage = session.createObjectMessage(message);
+        producer.send(objectMessage);
+    }
+
+    public Message receive(String subject, long timeout) throws JMSException {
+        Destination destination = getDestination(subject);
+        MessageConsumer consumer = session.createConsumer(destination);
+        return consumer.receive(timeout);
+    }
+
+
+    private Destination getDestination(String subject) {
+        synchronized (destinations) {
+            return (Destination) destinations.get(subject);
+        }
+    }
+
+
+    private Connection connect() throws JMSException {
+        return connect(1);
+    }
+
+    private Connection connect(int tries) throws JMSException {
+        getLogger().debug("Attempt "+tries+" to establish connection to "+transportUrl);
+        try {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(transportUrl);
+            Connection connection = connectionFactory.createConnection();
+            connection.start();
+            connection.setExceptionListener(this);
+            return connection;
+        } catch (JMSException e) {
+            if (tries >= maxAttempts){
+                throw e;
+            } else {
+                try {
+                    Thread.sleep(5000);
+                } catch (InterruptedException dontCare) {}
+                return connect(++tries);
+            }
+        }
+    }
+
+
+    public void onException(JMSException jmsException) {
+        getLogger().error(jmsException.getMessage());
+        Throwable cause = jmsException.getCause();
+        if (!failed && cause instanceof SocketException) {
+            try {
+                reset();
+            } catch (JMSException e) {
+                getLogger().error("Unable to restablish a connection to "+transportUrl);
+                failed = true;
+                listener.onException(jmsException);
+            }
+        } else {
+            listener.onException(jmsException);
+        }
+    }
+}