You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/02/06 17:25:45 UTC

svn commit: r1241061 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/

Author: dejanb
Date: Mon Feb  6 16:25:44 2012
New Revision: 1241061

URL: http://svn.apache.org/viewvc?rev=1241061&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3699 - priority uris for failover transport

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1241061&r1=1241060&r2=1241061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon Feb  6 16:25:44 2012
@@ -115,6 +115,11 @@ public class FailoverTransport implement
     private String updateURIsURL = null;
     private boolean rebalanceUpdateURIs = true;
     private boolean doRebalance = false;
+    private boolean connectedToPriority = false;
+
+    private boolean priorityBackup = false;
+    private ArrayList<URI> priorityList = new ArrayList<URI>();
+    private boolean priorityBackupAvailable = false;
 
     public FailoverTransport() throws InterruptedIOException {
         brokerSslContext = SslContext.getCurrentSslContext();
@@ -128,13 +133,22 @@ public class FailoverTransport implement
                 }
                 boolean buildBackup = true;
                 synchronized (backupMutex) {
-                    if ((connectedTransport.get() == null || doRebalance) && !disposed) {
+                    if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
                         result = doReconnect();
                         buildBackup = false;
+                        connectedToPriority = isPriority(connectedTransportURI);
                     }
                 }
                 if (buildBackup) {
                     buildBackups();
+                    if (priorityBackup && !connectedToPriority) {
+                        try {
+                            doDelay();
+                            reconnectTask.wakeup();
+                        } catch (InterruptedException e) {
+                            LOG.debug("Reconnect task has been interrupted.", e);
+                        }
+                    }
                 } else {
                     // build backups on the next iteration
                     buildBackup = true;
@@ -471,6 +485,27 @@ public class FailoverTransport implement
         this.maxCacheSize = maxCacheSize;
     }
 
+    public boolean isPriorityBackup() {
+        return priorityBackup;
+    }
+
+    public void setPriorityBackup(boolean priorityBackup) {
+        this.priorityBackup = priorityBackup;
+    }
+
+    public void setPriorityURIs(String priorityURIs) {
+        StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
+        while (tokenizer.hasMoreTokens()) {
+            String str = tokenizer.nextToken();
+            try {
+                URI uri = new URI(str);
+                priorityList.add(uri);
+            } catch (Exception e) {
+                LOG.error("Failed to parse broker address: " + str, e);
+            }
+        }
+    }
+
     public void oneway(Object o) throws IOException {
 
         Command command = (Command) o;
@@ -807,7 +842,7 @@ public class FailoverTransport implement
             if (disposed || connectionFailure != null) {
                 reconnectMutex.notifyAll();
             }
-            if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
+            if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
                 return false;
             } else {
                 List<URI> connectList = getConnectList();
@@ -844,7 +879,7 @@ public class FailoverTransport implement
 
                     // If we have a backup already waiting lets try it.
                     synchronized (backupMutex) {
-                        if (backup && !backups.isEmpty()) {
+                        if ((priorityBackup || backup) && !backups.isEmpty()) {
                             ArrayList<BackupTransport> l = new ArrayList(backups);
                             if (randomize) {
                                 Collections.shuffle(l);
@@ -853,6 +888,13 @@ public class FailoverTransport implement
                             backups.remove(bt);
                             transport = bt.getTransport();
                             uri = bt.getUri();
+                            if (priorityBackup && priorityBackupAvailable) {
+                                Transport old = this.connectedTransport.getAndSet(null);
+                                if (transport != null) {
+                                    disposeTransport(old);
+                                }
+                                priorityBackupAvailable = false;
+                            }
                         }
                     }
 
@@ -979,30 +1021,33 @@ public class FailoverTransport implement
         }
 
         if (!disposed) {
+            doDelay();
+        }
 
-            if (reconnectDelay > 0) {
-                synchronized (sleepMutex) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
-                    }
-                    try {
-                        sleepMutex.wait(reconnectDelay);
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
+        return !disposed;
+    }
 
-            if (useExponentialBackOff) {
-                // Exponential increment of reconnect delay.
-                reconnectDelay *= backOffMultiplier;
-                if (reconnectDelay > maxReconnectDelay) {
-                    reconnectDelay = maxReconnectDelay;
+    private void doDelay() {
+        if (reconnectDelay > 0) {
+            synchronized (sleepMutex) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
+                }
+                try {
+                    sleepMutex.wait(reconnectDelay);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                 }
             }
         }
 
-        return !disposed;
+        if (useExponentialBackOff) {
+            // Exponential increment of reconnect delay.
+            reconnectDelay *= backOffMultiplier;
+            if (reconnectDelay > maxReconnectDelay) {
+                reconnectDelay = maxReconnectDelay;
+            }
+        }
     }
 
     private void resetReconnectDelay() {
@@ -1035,8 +1080,14 @@ public class FailoverTransport implement
 
     final boolean buildBackups() {
         synchronized (backupMutex) {
-            if (!disposed && backup && backups.size() < backupPoolSize) {
+            if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
+                ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
                 List<URI> connectList = getConnectList();
+                for (URI uri: connectList) {
+                    if (!backupList.contains(uri)) {
+                        backupList.add(uri);
+                    }
+                }
                 // removed disposed backups
                 List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
                 for (BackupTransport bt : backups) {
@@ -1046,7 +1097,7 @@ public class FailoverTransport implement
                 }
                 backups.removeAll(disposedList);
                 disposedList.clear();
-                for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
+                for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
                     URI uri = iter.next();
                     if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
                         try {
@@ -1059,6 +1110,9 @@ public class FailoverTransport implement
                                 t.start();
                                 bt.setTransport(t);
                                 backups.add(bt);
+                                if (priorityBackup && isPriority(uri)) {
+                                   priorityBackupAvailable = true;
+                                }
                             }
                         } catch (Exception e) {
                             LOG.debug("Failed to build backup ", e);
@@ -1071,6 +1125,13 @@ public class FailoverTransport implement
         }
         return false;
     }
+    
+    protected boolean isPriority(URI uri) {
+        if (!priorityList.isEmpty()) {
+            return priorityList.contains(uri);
+        }
+        return uris.indexOf(uri) == 0;
+    }
 
     public boolean isDisposed() {
         return disposed;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java?rev=1241061&r1=1241060&r2=1241061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java Mon Feb  6 16:25:44 2012
@@ -65,6 +65,12 @@ public class FailoverClusterTestSupport 
                 set.size() == 3);
     }
 
+    protected void assertAllConnectedTo(String url) throws Exception {
+        for (ActiveMQConnection c : connections) {
+            assertEquals(c.getTransportChannel().getRemoteAddress(), url);
+        }
+    }    
+
     protected void addBroker(String name, BrokerService brokerService) {
         brokers.put(name, brokerService);
     }
@@ -72,6 +78,12 @@ public class FailoverClusterTestSupport 
     protected BrokerService getBroker(String name) {
         return brokers.get(name);
     }
+    
+    protected void stopBroker(String name) throws Exception {
+        BrokerService broker = brokers.remove(name);
+        broker.stop();
+        broker.waitUntilStopped();
+    }
 
     protected BrokerService removeBroker(String name) {
         return brokers.remove(name);
@@ -126,11 +138,14 @@ public class FailoverClusterTestSupport 
         }
     }
 
-    @SuppressWarnings("unused")
     protected void createClients() throws Exception {
+        createClients(NUMBER_OF_CLIENTS);
+    }
+    
+    protected void createClients(int num) throws Exception {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
                 clientUrl);
-        for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
+        for (int i = 0; i < num; i++) {
             ActiveMQConnection c = (ActiveMQConnection) factory
                     .createConnection();
             c.start();

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java?rev=1241061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java Mon Feb  6 16:25:44 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class FailoverPriorityTest extends FailoverClusterTestSupport {
+
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
+    private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
+    private HashMap<String,String> urls = new HashMap<String,String>();
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        urls.put(BROKER_A_NAME, BROKER_A_CLIENT_TC_ADDRESS);
+        urls.put(BROKER_B_NAME, BROKER_B_CLIENT_TC_ADDRESS);
+    }
+
+    private static final String BROKER_A_NAME = "BROKERA";
+    private static final String BROKER_B_NAME = "BROKERB";
+    
+    
+    public void testPriorityBackup() throws Exception {
+        createBrokerA();
+        createBrokerB();
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        Thread.sleep(1000);
+
+        setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
+        createClients(5);
+
+        assertAllConnectedTo(urls.get(BROKER_A_NAME));
+
+
+        restart(false, BROKER_A_NAME, BROKER_B_NAME);
+        
+        for (int i = 0; i < 3; i++) {
+            restart(true, BROKER_A_NAME, BROKER_B_NAME);
+        }
+
+        Thread.sleep(5000);
+
+        restart(false, BROKER_A_NAME, BROKER_B_NAME);
+
+    }
+
+    public void testPriorityBackupList() throws Exception {
+        createBrokerA();
+        createBrokerB();
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        Thread.sleep(1000);
+
+        setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&priorityURIs=tcp://127.0.0.1:61617&initialReconnectDelay=1000&useExponentialBackOff=false");
+        createClients(5);
+
+        Thread.sleep(3000);
+
+        assertAllConnectedTo(urls.get(BROKER_B_NAME));
+
+        restart(false, BROKER_B_NAME, BROKER_A_NAME);
+
+        for (int i = 0; i < 3; i++) {
+            restart(true, BROKER_B_NAME, BROKER_A_NAME);
+        }
+
+        restart(false, BROKER_B_NAME, BROKER_A_NAME);
+
+    }
+    
+    private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
+
+        Thread.sleep(1000);
+
+        if (primary) {
+            LOG.info("Stopping " + primaryName);
+            stopBroker(primaryName);
+        } else {
+            LOG.info("Stopping " + secondaryName);
+            stopBroker(secondaryName);
+        }
+        Thread.sleep(5000);
+
+        if (primary) {
+            assertAllConnectedTo(urls.get(secondaryName));
+        } else {
+            assertAllConnectedTo(urls.get(primaryName));
+        }
+
+        if (primary) {
+            LOG.info("Starting " + primaryName);
+            createBrokerByName(primaryName);
+            getBroker(primaryName).waitUntilStarted();
+        } else {
+            LOG.info("Starting " + secondaryName);
+            createBrokerByName(secondaryName);
+            getBroker(secondaryName).waitUntilStarted();
+        }
+
+        Thread.sleep(5000);
+
+        assertAllConnectedTo(urls.get(primaryName));
+        
+    }
+    
+    private void createBrokerByName(String name) throws Exception {
+        if (name.equals(BROKER_A_NAME)) {
+            createBrokerA();
+        } else if (name.equals(BROKER_B_NAME)) {
+            createBrokerB();
+        } else {
+            throw new Exception("Unknown broker " + name);
+        }
+    }
+
+    private void createBrokerA() throws Exception {
+        if (getBroker(BROKER_A_NAME) == null) {
+            addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+            addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
+            addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            getBroker(BROKER_A_NAME).start();
+        }
+    }
+
+    private void createBrokerB() throws Exception {
+        if (getBroker(BROKER_B_NAME) == null) {
+            addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+            addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
+            addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            getBroker(BROKER_B_NAME).start();
+        }
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        shutdownClients();
+        destroyBrokerCluster();
+    }
+
+}