You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/02/06 17:25:45 UTC
svn commit: r1241061 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/failover/
test/java/org/apache/activemq/transport/failover/
Author: dejanb
Date: Mon Feb 6 16:25:44 2012
New Revision: 1241061
URL: http://svn.apache.org/viewvc?rev=1241061&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3699 - priority uris for failover transport
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1241061&r1=1241060&r2=1241061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon Feb 6 16:25:44 2012
@@ -115,6 +115,11 @@ public class FailoverTransport implement
private String updateURIsURL = null;
private boolean rebalanceUpdateURIs = true;
private boolean doRebalance = false;
+ private boolean connectedToPriority = false;
+
+ private boolean priorityBackup = false;
+ private ArrayList<URI> priorityList = new ArrayList<URI>();
+ private boolean priorityBackupAvailable = false;
public FailoverTransport() throws InterruptedIOException {
brokerSslContext = SslContext.getCurrentSslContext();
@@ -128,13 +133,22 @@ public class FailoverTransport implement
}
boolean buildBackup = true;
synchronized (backupMutex) {
- if ((connectedTransport.get() == null || doRebalance) && !disposed) {
+ if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
result = doReconnect();
buildBackup = false;
+ connectedToPriority = isPriority(connectedTransportURI);
}
}
if (buildBackup) {
buildBackups();
+ if (priorityBackup && !connectedToPriority) {
+ try {
+ doDelay();
+ reconnectTask.wakeup();
+ } catch (InterruptedException e) {
+ LOG.debug("Reconnect task has been interrupted.", e);
+ }
+ }
} else {
// build backups on the next iteration
buildBackup = true;
@@ -471,6 +485,27 @@ public class FailoverTransport implement
this.maxCacheSize = maxCacheSize;
}
+ public boolean isPriorityBackup() {
+ return priorityBackup;
+ }
+
+ public void setPriorityBackup(boolean priorityBackup) {
+ this.priorityBackup = priorityBackup;
+ }
+
+ public void setPriorityURIs(String priorityURIs) {
+ StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
+ while (tokenizer.hasMoreTokens()) {
+ String str = tokenizer.nextToken();
+ try {
+ URI uri = new URI(str);
+ priorityList.add(uri);
+ } catch (Exception e) {
+ LOG.error("Failed to parse broker address: " + str, e);
+ }
+ }
+ }
+
public void oneway(Object o) throws IOException {
Command command = (Command) o;
@@ -807,7 +842,7 @@ public class FailoverTransport implement
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
- if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
+ if ((connectedTransport.get() != null && !doRebalance && !priorityBackupAvailable) || disposed || connectionFailure != null) {
return false;
} else {
List<URI> connectList = getConnectList();
@@ -844,7 +879,7 @@ public class FailoverTransport implement
// If we have a backup already waiting lets try it.
synchronized (backupMutex) {
- if (backup && !backups.isEmpty()) {
+ if ((priorityBackup || backup) && !backups.isEmpty()) {
ArrayList<BackupTransport> l = new ArrayList(backups);
if (randomize) {
Collections.shuffle(l);
@@ -853,6 +888,13 @@ public class FailoverTransport implement
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
+ if (priorityBackup && priorityBackupAvailable) {
+ Transport old = this.connectedTransport.getAndSet(null);
+ if (transport != null) {
+ disposeTransport(old);
+ }
+ priorityBackupAvailable = false;
+ }
}
}
@@ -979,30 +1021,33 @@ public class FailoverTransport implement
}
if (!disposed) {
+ doDelay();
+ }
- if (reconnectDelay > 0) {
- synchronized (sleepMutex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
- }
- try {
- sleepMutex.wait(reconnectDelay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
+ return !disposed;
+ }
- if (useExponentialBackOff) {
- // Exponential increment of reconnect delay.
- reconnectDelay *= backOffMultiplier;
- if (reconnectDelay > maxReconnectDelay) {
- reconnectDelay = maxReconnectDelay;
+ private void doDelay() {
+ if (reconnectDelay > 0) {
+ synchronized (sleepMutex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection");
+ }
+ try {
+ sleepMutex.wait(reconnectDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
}
- return !disposed;
+ if (useExponentialBackOff) {
+ // Exponential increment of reconnect delay.
+ reconnectDelay *= backOffMultiplier;
+ if (reconnectDelay > maxReconnectDelay) {
+ reconnectDelay = maxReconnectDelay;
+ }
+ }
}
private void resetReconnectDelay() {
@@ -1035,8 +1080,14 @@ public class FailoverTransport implement
final boolean buildBackups() {
synchronized (backupMutex) {
- if (!disposed && backup && backups.size() < backupPoolSize) {
+ if (!disposed && (backup || priorityBackup) && backups.size() < backupPoolSize) {
+ ArrayList<URI> backupList = new ArrayList<URI>(priorityList);
List<URI> connectList = getConnectList();
+ for (URI uri: connectList) {
+ if (!backupList.contains(uri)) {
+ backupList.add(uri);
+ }
+ }
// removed disposed backups
List<BackupTransport> disposedList = new ArrayList<BackupTransport>();
for (BackupTransport bt : backups) {
@@ -1046,7 +1097,7 @@ public class FailoverTransport implement
}
backups.removeAll(disposedList);
disposedList.clear();
- for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
+ for (Iterator<URI> iter = backupList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
@@ -1059,6 +1110,9 @@ public class FailoverTransport implement
t.start();
bt.setTransport(t);
backups.add(bt);
+ if (priorityBackup && isPriority(uri)) {
+ priorityBackupAvailable = true;
+ }
}
} catch (Exception e) {
LOG.debug("Failed to build backup ", e);
@@ -1071,6 +1125,13 @@ public class FailoverTransport implement
}
return false;
}
+
+ protected boolean isPriority(URI uri) {
+ if (!priorityList.isEmpty()) {
+ return priorityList.contains(uri);
+ }
+ return uris.indexOf(uri) == 0;
+ }
public boolean isDisposed() {
return disposed;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java?rev=1241061&r1=1241060&r2=1241061&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java Mon Feb 6 16:25:44 2012
@@ -65,6 +65,12 @@ public class FailoverClusterTestSupport
set.size() == 3);
}
+ protected void assertAllConnectedTo(String url) throws Exception {
+ for (ActiveMQConnection c : connections) {
+ assertEquals(c.getTransportChannel().getRemoteAddress(), url);
+ }
+ }
+
protected void addBroker(String name, BrokerService brokerService) {
brokers.put(name, brokerService);
}
@@ -72,6 +78,12 @@ public class FailoverClusterTestSupport
protected BrokerService getBroker(String name) {
return brokers.get(name);
}
+
+ protected void stopBroker(String name) throws Exception {
+ BrokerService broker = brokers.remove(name);
+ broker.stop();
+ broker.waitUntilStopped();
+ }
protected BrokerService removeBroker(String name) {
return brokers.remove(name);
@@ -126,11 +138,14 @@ public class FailoverClusterTestSupport
}
}
- @SuppressWarnings("unused")
protected void createClients() throws Exception {
+ createClients(NUMBER_OF_CLIENTS);
+ }
+
+ protected void createClients(int num) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
clientUrl);
- for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
+ for (int i = 0; i < num; i++) {
ActiveMQConnection c = (ActiveMQConnection) factory
.createConnection();
c.start();
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java?rev=1241061&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java Mon Feb 6 16:25:44 2012
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+public class FailoverPriorityTest extends FailoverClusterTestSupport {
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
+ private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
+ private HashMap<String,String> urls = new HashMap<String,String>();
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ urls.put(BROKER_A_NAME, BROKER_A_CLIENT_TC_ADDRESS);
+ urls.put(BROKER_B_NAME, BROKER_B_CLIENT_TC_ADDRESS);
+ }
+
+ private static final String BROKER_A_NAME = "BROKERA";
+ private static final String BROKER_B_NAME = "BROKERB";
+
+
+ public void testPriorityBackup() throws Exception {
+ createBrokerA();
+ createBrokerB();
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ Thread.sleep(1000);
+
+ setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
+ createClients(5);
+
+ assertAllConnectedTo(urls.get(BROKER_A_NAME));
+
+
+ restart(false, BROKER_A_NAME, BROKER_B_NAME);
+
+ for (int i = 0; i < 3; i++) {
+ restart(true, BROKER_A_NAME, BROKER_B_NAME);
+ }
+
+ Thread.sleep(5000);
+
+ restart(false, BROKER_A_NAME, BROKER_B_NAME);
+
+ }
+
+ public void testPriorityBackupList() throws Exception {
+ createBrokerA();
+ createBrokerB();
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ Thread.sleep(1000);
+
+ setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&priorityURIs=tcp://127.0.0.1:61617&initialReconnectDelay=1000&useExponentialBackOff=false");
+ createClients(5);
+
+ Thread.sleep(3000);
+
+ assertAllConnectedTo(urls.get(BROKER_B_NAME));
+
+ restart(false, BROKER_B_NAME, BROKER_A_NAME);
+
+ for (int i = 0; i < 3; i++) {
+ restart(true, BROKER_B_NAME, BROKER_A_NAME);
+ }
+
+ restart(false, BROKER_B_NAME, BROKER_A_NAME);
+
+ }
+
+ private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
+
+ Thread.sleep(1000);
+
+ if (primary) {
+ LOG.info("Stopping " + primaryName);
+ stopBroker(primaryName);
+ } else {
+ LOG.info("Stopping " + secondaryName);
+ stopBroker(secondaryName);
+ }
+ Thread.sleep(5000);
+
+ if (primary) {
+ assertAllConnectedTo(urls.get(secondaryName));
+ } else {
+ assertAllConnectedTo(urls.get(primaryName));
+ }
+
+ if (primary) {
+ LOG.info("Starting " + primaryName);
+ createBrokerByName(primaryName);
+ getBroker(primaryName).waitUntilStarted();
+ } else {
+ LOG.info("Starting " + secondaryName);
+ createBrokerByName(secondaryName);
+ getBroker(secondaryName).waitUntilStarted();
+ }
+
+ Thread.sleep(5000);
+
+ assertAllConnectedTo(urls.get(primaryName));
+
+ }
+
+ private void createBrokerByName(String name) throws Exception {
+ if (name.equals(BROKER_A_NAME)) {
+ createBrokerA();
+ } else if (name.equals(BROKER_B_NAME)) {
+ createBrokerB();
+ } else {
+ throw new Exception("Unknown broker " + name);
+ }
+ }
+
+ private void createBrokerA() throws Exception {
+ if (getBroker(BROKER_A_NAME) == null) {
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ getBroker(BROKER_A_NAME).start();
+ }
+ }
+
+ private void createBrokerB() throws Exception {
+ if (getBroker(BROKER_B_NAME) == null) {
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ getBroker(BROKER_B_NAME).start();
+ }
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ shutdownClients();
+ destroyBrokerCluster();
+ }
+
+}