You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/29 00:07:22 UTC

svn commit: r1177088 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/

Author: tabish
Date: Wed Sep 28 22:07:21 2011
New Revision: 1177088

URL: http://svn.apache.org/viewvc?rev=1177088&view=rev
Log:
apply fixes for https://issues.apache.org/jira/browse/AMQ-3513

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1177088&r1=1177087&r2=1177088&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Sep 28 22:07:21 2011
@@ -14,11 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.transport.failover;
 
 import java.io.BufferedReader;
-import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -37,6 +35,7 @@ import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -60,12 +59,9 @@ import org.apache.activemq.util.ServiceS
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * A Transport that is made reliable by being able to fail over to another
  * transport when a transport failure is detected.
- * 
- * 
  */
 public class FailoverTransport implements CompositeTransport {
 
@@ -114,12 +110,12 @@ public class FailoverTransport implement
     //private boolean connectionInterruptProcessingComplete;
 
     private final TransportListener myTransportListener = createTransportListener();
-    private boolean updateURIsSupported=true;
-    private boolean reconnectSupported=true;
+    private boolean updateURIsSupported = true;
+    private boolean reconnectSupported = true;
     // remember for reconnect thread
     private SslContext brokerSslContext;
     private String updateURIsURL = null;
-    private boolean rebalanceUpdateURIs=true;
+    private boolean rebalanceUpdateURIs = true;
     private boolean doRebalance = false;
 
     public FailoverTransport() throws InterruptedIOException {
@@ -130,7 +126,6 @@ public class FailoverTransport implement
             public boolean iterate() {
                 boolean result = false;
                 boolean buildBackup = true;
-                boolean doReconnect = !disposed;
                 synchronized (backupMutex) {
                     if ((connectedTransport.get() == null || doRebalance) && !disposed) {
                         result = doReconnect();
@@ -170,11 +165,11 @@ public class FailoverTransport implement
                         ((Tracked) object).onResponses(command);
                     }
                 }
-                if (!initialized) {      
+                if (!initialized) {
                     initialized = true;
                 }
-                
-                if(command.isConnectionControl()) {
+
+                if (command.isConnectionControl()) {
                     handleConnectionControl((ConnectionControl) command);
                 }
                 if (transportListener != null) {
@@ -238,8 +233,7 @@ public class FailoverTransport implement
                 connectedTransportURI = null;
                 connected = false;
 
-                // notify before any reconnect attempt so ack state can be
-                // whacked
+                // notify before any reconnect attempt so ack state can be whacked
                 if (transportListener != null) {
                     transportListener.transportInterupted();
                 }
@@ -292,7 +286,6 @@ public class FailoverTransport implement
                         LOG.error("Failed to update transport URI's from: " + newTransports, e);
                     }
                 }
-
             }
         }
     }
@@ -416,8 +409,7 @@ public class FailoverTransport implement
     }
 
     /**
-     * @param randomize
-     *            The randomize to set.
+     * @param randomize The randomize to set.
      */
     public void setRandomize(boolean randomize) {
         this.randomize = randomize;
@@ -571,7 +563,6 @@ public class FailoverTransport implement
                                 // the outer catch
                                 throw e;
                             }
-
                         }
 
                         return;
@@ -613,9 +604,9 @@ public class FailoverTransport implement
 
     public void add(boolean rebalance, URI u[]) {
         boolean newURI = false;
-        for (int i = 0; i < u.length; i++) {
-            if (!contains(u[i])) {
-                uris.add(u[i]);
+        for (URI uri : u) {
+            if (!contains(uri)) {
+                uris.add(uri);
                 newURI = true;
             }
         }
@@ -625,8 +616,8 @@ public class FailoverTransport implement
     }
 
     public void remove(boolean rebalance, URI u[]) {
-        for (int i = 0; i < u.length; i++) {
-            uris.remove(u[i]);
+        for (URI uri : u) {
+            uris.remove(uri);
         }
         // rebalance is automatic if any connected to removed/stopped broker
     }
@@ -634,11 +625,11 @@ public class FailoverTransport implement
     public void add(boolean rebalance, String u) {
         try {
             URI newURI = new URI(u);
-            if (contains(newURI)==false) {
+            if (contains(newURI) == false) {
                 uris.add(newURI);
                 reconnect(rebalance);
             }
-       
+
         } catch (Exception e) {
             LOG.error("Failed to parse URI: " + u);
         }
@@ -680,7 +671,9 @@ public class FailoverTransport implement
         if (removed) {
             l.add(failedConnectTransportURI);
         }
-        LOG.debug("urlList connectionList:" + l + ", from: " + uris);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
+        }
         return l;
     }
 
@@ -715,12 +708,11 @@ public class FailoverTransport implement
         cc.setFaultTolerant(true);
         t.oneway(cc);
         stateTracker.restore(t);
-        Map tmpMap = null;
+        Map<Integer, Command> tmpMap = null;
         synchronized (requestMap) {
             tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
         }
-        for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
-            Command command = iter2.next();
+        for (Command command : tmpMap.values()) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("restore requestMap, replay: " + command);
             }
@@ -753,44 +745,50 @@ public class FailoverTransport implement
         return true;
     }
 
-    final boolean doReconnect() {
-        Exception failure = null;
-        synchronized (reconnectMutex) {
+    private void doUpdateURIsFromDisk() {
 
-            // If updateURIsURL is specified, read the file and add any new
-            // transport URI's to this FailOverTransport. 
-            // Note: Could track file timestamp to avoid unnecessary reading.
-            String fileURL = getUpdateURIsURL();
-            if (fileURL != null) {
-                BufferedReader in = null;
-                String newUris = null;
-                StringBuffer buffer = new StringBuffer();
-
-                try {
-                    in = new BufferedReader(getURLStream(fileURL));
-                    while (true) {
-                        String line = in.readLine();
-                        if (line == null) {
-                            break;
-                        }
-                        buffer.append(line);
-                    }
-                    newUris = buffer.toString();
-                } catch (IOException ioe) {
-                    LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
-                } finally {
-                    if (in != null) {
-                        try {
-                            in.close();
-                        } catch (IOException ioe) {
-                            // ignore
-                        }
+        // If updateURIsURL is specified, read the file and add any new
+        // transport URI's to this FailOverTransport.
+        // Note: Could track file timestamp to avoid unnecessary reading.
+        String fileURL = getUpdateURIsURL();
+        if (fileURL != null) {
+            BufferedReader in = null;
+            String newUris = null;
+            StringBuffer buffer = new StringBuffer();
+
+            try {
+                in = new BufferedReader(getURLStream(fileURL));
+                while (true) {
+                    String line = in.readLine();
+                    if (line == null) {
+                        break;
+                    }
+                    buffer.append(line);
+                }
+                newUris = buffer.toString();
+            } catch (IOException ioe) {
+                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
+            } finally {
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (IOException ioe) {
+                        // ignore
                     }
                 }
-                
-                processNewTransports(isRebalanceUpdateURIs(), newUris);
             }
 
+            processNewTransports(isRebalanceUpdateURIs(), newUris);
+        }
+    }
+
+    final boolean doReconnect() {
+        Exception failure = null;
+        synchronized (reconnectMutex) {
+
+            // First ensure we are up to date.
+            doUpdateURIsFromDisk();
+
             if (disposed || connectionFailure != null) {
                 reconnectMutex.notifyAll();
             }
@@ -808,14 +806,18 @@ public class FailoverTransport implement
                             doRebalance = false;
                             return false;
                         } else {
-                            LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
+                            }
                             try {
                                 Transport transport = this.connectedTransport.getAndSet(null);
                                 if (transport != null) {
                                     disposeTransport(transport);
                                 }
                             } catch (Exception e) {
-                                LOG.debug("Caught an exception stopping existing transport for rebalance", e);
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
+                                }
                             }
                         }
                         doRebalance = false;
@@ -847,12 +849,27 @@ public class FailoverTransport implement
                         }
                     }
 
+                    // Sleep for the reconnectDelay
+                    if (!firstConnection && (reconnectDelay > 0) && !disposed) {
+                        synchronized (sleepMutex) {
+                            LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+                            try {
+                                sleepMutex.wait(reconnectDelay);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                            }
+                        }
+                    }
+
                     Iterator<URI> iter = connectList.iterator();
                     while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
+
                         URI uri = iter.next();
                         Transport t = null;
                         try {
-                            LOG.debug("Attempting connect to: " + uri);
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Attempting connect to: " + uri);
+                            }
                             SslContext.setCurrentSslContext(brokerSslContext);
                             t = TransportFactory.compositeConnect(uri);
                             t.setTransportListener(myTransportListener);
@@ -924,8 +941,7 @@ public class FailoverTransport implement
                 connectionFailure = failure;
 
                 // Make sure on initial startup, that the transportListener has
-                // been initialized
-                // for this instance.
+                // been initialized for this instance.
                 synchronized (listenerMutex) {
                     if (transportListener == null) {
                         try {
@@ -946,14 +962,17 @@ public class FailoverTransport implement
                 return false;
             }
         }
+
         if (!disposed) {
 
-            LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
-            synchronized (sleepMutex) {
-                try {
-                    sleepMutex.wait(reconnectDelay);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
+            if (reconnectDelay > 0) {
+                synchronized (sleepMutex) {
+                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+                    try {
+                        sleepMutex.wait(reconnectDelay);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
 
@@ -965,6 +984,7 @@ public class FailoverTransport implement
                 }
             }
         }
+
         return !disposed;
     }
 
@@ -981,7 +1001,7 @@ public class FailoverTransport implement
                 }
                 backups.removeAll(disposedList);
                 disposedList.clear();
-                for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
+                for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
                     URI uri = iter.next();
                     if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
                         try {
@@ -1016,23 +1036,23 @@ public class FailoverTransport implement
     }
 
     public void reconnect(URI uri) throws IOException {
-        add(true, new URI[] { uri });
+        add(true, new URI[]{uri});
     }
 
     public boolean isReconnectSupported() {
         return this.reconnectSupported;
     }
-    
+
     public void setReconnectSupported(boolean value) {
-        this.reconnectSupported=value;
+        this.reconnectSupported = value;
     }
-   
+
     public boolean isUpdateURIsSupported() {
         return this.updateURIsSupported;
     }
-    
+
     public void setUpdateURIsSupported(boolean value) {
-        this.updateURIsSupported=value;
+        this.updateURIsSupported = value;
     }
 
     public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
@@ -1041,8 +1061,7 @@ public class FailoverTransport implement
             List<URI> add = new ArrayList<URI>();
             if (updatedURIs != null && updatedURIs.length > 0) {
                 Set<URI> set = new HashSet<URI>();
-                for (int i = 0; i < updatedURIs.length; i++) {
-                    URI uri = updatedURIs[i];
+                for (URI uri : updatedURIs) {
                     if (uri != null) {
                         set.add(uri);
                     }
@@ -1063,7 +1082,7 @@ public class FailoverTransport implement
             }
         }
     }
-    
+
     /**
      * @return the updateURIsURL
      */
@@ -1077,7 +1096,7 @@ public class FailoverTransport implement
     public void setUpdateURIsURL(String updateURIsURL) {
         this.updateURIsURL = updateURIsURL;
     }
-    
+
     /**
      * @return the rebalanceUpdateURIs
      */
@@ -1105,32 +1124,32 @@ public class FailoverTransport implement
             stateTracker.connectionInterruptProcessingComplete(this, connectionId);
         }
     }
-    
+
     public ConnectionStateTracker getStateTracker() {
         return stateTracker;
     }
-    
+
     private boolean contains(URI newURI) {
 
         boolean result = false;
         try {
-        for (URI uri:uris) {
-            if (newURI.getPort()==uri.getPort()) {
-                InetAddress newAddr = InetAddress.getByName(newURI.getHost());
-                InetAddress addr = InetAddress.getByName(uri.getHost());
-                if (addr.equals(newAddr)) {
-                    result = true;
-                    break;
+            for (URI uri : uris) {
+                if (newURI.getPort() == uri.getPort()) {
+                    InetAddress newAddr = InetAddress.getByName(newURI.getHost());
+                    InetAddress addr = InetAddress.getByName(uri.getHost());
+                    if (addr.equals(newAddr)) {
+                        result = true;
+                        break;
+                    }
                 }
             }
-        }
-        }catch(IOException e) {
+        } catch (IOException e) {
             result = true;
             LOG.error("Failed to verify URI " + newURI + " already known: " + e);
         }
         return result;
     }
-    
+
     private InputStreamReader getURLStream(String path) throws IOException {
         InputStreamReader result = null;
         URL url = null;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java?rev=1177088&r1=1177087&r2=1177088&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java Wed Sep 28 22:07:21 2011
@@ -58,7 +58,7 @@ public class FailoverTransportFactory ex
      * @throws IOException
      */
     public Transport createTransport(CompositeData compositData) throws IOException {
-        Map options = compositData.getParameters();
+        Map<String, String> options = compositData.getParameters();
         FailoverTransport transport = createTransport(options);
         if (!options.isEmpty()) {
             throw new IllegalArgumentException("Invalid connect parameters: " + options);
@@ -67,7 +67,7 @@ public class FailoverTransportFactory ex
         return transport;
     }
 
-    public FailoverTransport createTransport(Map parameters) throws IOException {
+    public FailoverTransport createTransport(Map<String, String> parameters) throws IOException {
         FailoverTransport transport = new FailoverTransport();
         IntrospectionSupport.setProperties(transport, parameters);
         return transport;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java?rev=1177088&r1=1177087&r2=1177088&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java Wed Sep 28 22:07:21 2011
@@ -36,7 +36,6 @@ public class FailoverConsumerTest extend
     public static final int MSG_COUNT = 100;
     private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerTest.class);
 
-
     public void testPublisherFailsOver() throws Exception {
         // Uncomment this if you want to use remote broker created by
         // NetworkTestSupport.
@@ -72,7 +71,7 @@ public class FailoverConsumerTest extend
         // though).
         // So we must use external broker ant restart it manually.
         LOG.info("You should restart remote broker now and press enter!");
-        System.in.read();
+        //System.in.read();
         // Thread.sleep(20000);
         restartRemoteBroker();
         msg.acknowledge();
@@ -114,6 +113,6 @@ public class FailoverConsumerTest extend
     }
 
     protected String getRemoteURI() {
-        return "tcp://localhost:55555";
+        return "tcp://localhost:61616";
     }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java?rev=1177088&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java Wed Sep 28 22:07:21 2011
@@ -0,0 +1,110 @@
+package org.apache.activemq.transport.failover;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertTrue;
+
+public class InitalReconnectDelayTest {
+
+    private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
+    protected BrokerService broker1;
+    protected BrokerService broker2;
+    protected CountDownLatch broker2Started = new CountDownLatch(1);
+    protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000";
+
+    @Test
+    public void testInitialReconnectDelay() throws Exception {
+
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue destination = session.createQueue("foo");
+        MessageProducer producer = session.createProducer(destination);
+
+        long start = (new Date()).getTime();
+        producer.send(session.createTextMessage("TEST"));
+        long end = (new Date()).getTime();
+
+        //Verify we can send quickly
+        assertTrue((end - start) < 2000);
+
+        //Halt the broker1...
+        LOG.info("Stopping the Broker1...");
+        broker1.stop();
+
+        LOG.info("Attempting to send... failover should kick in...");
+        start = (new Date()).getTime();
+        producer.send(session.createTextMessage("TEST"));
+        end = (new Date()).getTime();
+
+        //Inital reconnection should kick in and be darned close to what we expected
+        LOG.info("Failover took " + (end - start) + " ms.");
+        assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        final String dataDir = "target/data/shared";
+
+        broker1 = new BrokerService();
+
+        broker1.setBrokerName("broker1");
+        broker1.setDeleteAllMessagesOnStartup(true);
+        broker1.setDataDirectory(dataDir);
+        broker1.addConnector("tcp://localhost:62001");
+        broker1.setUseJmx(false);
+        broker1.start();
+        broker1.waitUntilStarted();
+
+        broker2 = new BrokerService();
+        broker2.setBrokerName("broker2");
+        broker2.setDataDirectory(dataDir);
+        broker2.setUseJmx(false);
+        broker2.addConnector("tcp://localhost:62002");
+        broker2.start();
+        broker2.waitUntilStarted();
+
+    }
+
+    protected String getSlaveXml() {
+        return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
+    }
+
+    protected String getMasterXml() {
+        return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
+    }
+
+    @After
+    public void tearDown() throws Exception {
+
+        if (broker1.isStarted()) {
+            broker1.stop();
+            broker1.waitUntilStopped();
+        }
+
+        if (broker2.isStarted()) {
+            broker2.stop();
+            broker2.waitUntilStopped();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(uriString);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
------------------------------------------------------------------------------
    svn:eol-style = native