You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/29 00:07:22 UTC
svn commit: r1177088 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/failover/
test/java/org/apache/activemq/transport/failover/
Author: tabish
Date: Wed Sep 28 22:07:21 2011
New Revision: 1177088
URL: http://svn.apache.org/viewvc?rev=1177088&view=rev
Log:
apply fixes for https://issues.apache.org/jira/browse/AMQ-3513
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1177088&r1=1177087&r2=1177088&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Sep 28 22:07:21 2011
@@ -14,11 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.transport.failover;
import java.io.BufferedReader;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
@@ -37,6 +35,7 @@ import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -60,12 +59,9 @@ import org.apache.activemq.util.ServiceS
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A Transport that is made reliable by being able to fail over to another
* transport when a transport failure is detected.
- *
- *
*/
public class FailoverTransport implements CompositeTransport {
@@ -114,12 +110,12 @@ public class FailoverTransport implement
//private boolean connectionInterruptProcessingComplete;
private final TransportListener myTransportListener = createTransportListener();
- private boolean updateURIsSupported=true;
- private boolean reconnectSupported=true;
+ private boolean updateURIsSupported = true;
+ private boolean reconnectSupported = true;
// remember for reconnect thread
private SslContext brokerSslContext;
private String updateURIsURL = null;
- private boolean rebalanceUpdateURIs=true;
+ private boolean rebalanceUpdateURIs = true;
private boolean doRebalance = false;
public FailoverTransport() throws InterruptedIOException {
@@ -130,7 +126,6 @@ public class FailoverTransport implement
public boolean iterate() {
boolean result = false;
boolean buildBackup = true;
- boolean doReconnect = !disposed;
synchronized (backupMutex) {
if ((connectedTransport.get() == null || doRebalance) && !disposed) {
result = doReconnect();
@@ -170,11 +165,11 @@ public class FailoverTransport implement
((Tracked) object).onResponses(command);
}
}
- if (!initialized) {
+ if (!initialized) {
initialized = true;
}
-
- if(command.isConnectionControl()) {
+
+ if (command.isConnectionControl()) {
handleConnectionControl((ConnectionControl) command);
}
if (transportListener != null) {
@@ -238,8 +233,7 @@ public class FailoverTransport implement
connectedTransportURI = null;
connected = false;
- // notify before any reconnect attempt so ack state can be
- // whacked
+ // notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) {
transportListener.transportInterupted();
}
@@ -292,7 +286,6 @@ public class FailoverTransport implement
LOG.error("Failed to update transport URI's from: " + newTransports, e);
}
}
-
}
}
}
@@ -416,8 +409,7 @@ public class FailoverTransport implement
}
/**
- * @param randomize
- * The randomize to set.
+ * @param randomize The randomize to set.
*/
public void setRandomize(boolean randomize) {
this.randomize = randomize;
@@ -571,7 +563,6 @@ public class FailoverTransport implement
// the outer catch
throw e;
}
-
}
return;
@@ -613,9 +604,9 @@ public class FailoverTransport implement
public void add(boolean rebalance, URI u[]) {
boolean newURI = false;
- for (int i = 0; i < u.length; i++) {
- if (!contains(u[i])) {
- uris.add(u[i]);
+ for (URI uri : u) {
+ if (!contains(uri)) {
+ uris.add(uri);
newURI = true;
}
}
@@ -625,8 +616,8 @@ public class FailoverTransport implement
}
public void remove(boolean rebalance, URI u[]) {
- for (int i = 0; i < u.length; i++) {
- uris.remove(u[i]);
+ for (URI uri : u) {
+ uris.remove(uri);
}
// rebalance is automatic if any connected to removed/stopped broker
}
@@ -634,11 +625,11 @@ public class FailoverTransport implement
public void add(boolean rebalance, String u) {
try {
URI newURI = new URI(u);
- if (contains(newURI)==false) {
+ if (contains(newURI) == false) {
uris.add(newURI);
reconnect(rebalance);
}
-
+
} catch (Exception e) {
LOG.error("Failed to parse URI: " + u);
}
@@ -680,7 +671,9 @@ public class FailoverTransport implement
if (removed) {
l.add(failedConnectTransportURI);
}
- LOG.debug("urlList connectionList:" + l + ", from: " + uris);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("urlList connectionList:" + l + ", from: " + uris);
+ }
return l;
}
@@ -715,12 +708,11 @@ public class FailoverTransport implement
cc.setFaultTolerant(true);
t.oneway(cc);
stateTracker.restore(t);
- Map tmpMap = null;
+ Map<Integer, Command> tmpMap = null;
synchronized (requestMap) {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
}
- for (Iterator<Command> iter2 = tmpMap.values().iterator(); iter2.hasNext();) {
- Command command = iter2.next();
+ for (Command command : tmpMap.values()) {
if (LOG.isTraceEnabled()) {
LOG.trace("restore requestMap, replay: " + command);
}
@@ -753,44 +745,50 @@ public class FailoverTransport implement
return true;
}
- final boolean doReconnect() {
- Exception failure = null;
- synchronized (reconnectMutex) {
+ private void doUpdateURIsFromDisk() {
- // If updateURIsURL is specified, read the file and add any new
- // transport URI's to this FailOverTransport.
- // Note: Could track file timestamp to avoid unnecessary reading.
- String fileURL = getUpdateURIsURL();
- if (fileURL != null) {
- BufferedReader in = null;
- String newUris = null;
- StringBuffer buffer = new StringBuffer();
-
- try {
- in = new BufferedReader(getURLStream(fileURL));
- while (true) {
- String line = in.readLine();
- if (line == null) {
- break;
- }
- buffer.append(line);
- }
- newUris = buffer.toString();
- } catch (IOException ioe) {
- LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
- } finally {
- if (in != null) {
- try {
- in.close();
- } catch (IOException ioe) {
- // ignore
- }
+ // If updateURIsURL is specified, read the file and add any new
+ // transport URI's to this FailOverTransport.
+ // Note: Could track file timestamp to avoid unnecessary reading.
+ String fileURL = getUpdateURIsURL();
+ if (fileURL != null) {
+ BufferedReader in = null;
+ String newUris = null;
+ StringBuffer buffer = new StringBuffer();
+
+ try {
+ in = new BufferedReader(getURLStream(fileURL));
+ while (true) {
+ String line = in.readLine();
+ if (line == null) {
+ break;
+ }
+ buffer.append(line);
+ }
+ newUris = buffer.toString();
+ } catch (IOException ioe) {
+ LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (IOException ioe) {
+ // ignore
}
}
-
- processNewTransports(isRebalanceUpdateURIs(), newUris);
}
+ processNewTransports(isRebalanceUpdateURIs(), newUris);
+ }
+ }
+
+ final boolean doReconnect() {
+ Exception failure = null;
+ synchronized (reconnectMutex) {
+
+ // First ensure we are up to date.
+ doUpdateURIsFromDisk();
+
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
@@ -808,14 +806,18 @@ public class FailoverTransport implement
doRebalance = false;
return false;
} else {
- LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
+ }
try {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
disposeTransport(transport);
}
} catch (Exception e) {
- LOG.debug("Caught an exception stopping existing transport for rebalance", e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Caught an exception stopping existing transport for rebalance", e);
+ }
}
}
doRebalance = false;
@@ -847,12 +849,27 @@ public class FailoverTransport implement
}
}
+ // Sleep for the reconnectDelay
+ if (!firstConnection && (reconnectDelay > 0) && !disposed) {
+ synchronized (sleepMutex) {
+ LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+ try {
+ sleepMutex.wait(reconnectDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
Iterator<URI> iter = connectList.iterator();
while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
+
URI uri = iter.next();
Transport t = null;
try {
- LOG.debug("Attempting connect to: " + uri);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempting connect to: " + uri);
+ }
SslContext.setCurrentSslContext(brokerSslContext);
t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
@@ -924,8 +941,7 @@ public class FailoverTransport implement
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has
- // been initialized
- // for this instance.
+ // been initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
@@ -946,14 +962,17 @@ public class FailoverTransport implement
return false;
}
}
+
if (!disposed) {
- LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
- synchronized (sleepMutex) {
- try {
- sleepMutex.wait(reconnectDelay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (reconnectDelay > 0) {
+ synchronized (sleepMutex) {
+ LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+ try {
+ sleepMutex.wait(reconnectDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
}
@@ -965,6 +984,7 @@ public class FailoverTransport implement
}
}
}
+
return !disposed;
}
@@ -981,7 +1001,7 @@ public class FailoverTransport implement
}
backups.removeAll(disposedList);
disposedList.clear();
- for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize;) {
+ for (Iterator<URI> iter = connectList.iterator(); iter.hasNext() && backups.size() < backupPoolSize; ) {
URI uri = iter.next();
if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
try {
@@ -1016,23 +1036,23 @@ public class FailoverTransport implement
}
public void reconnect(URI uri) throws IOException {
- add(true, new URI[] { uri });
+ add(true, new URI[]{uri});
}
public boolean isReconnectSupported() {
return this.reconnectSupported;
}
-
+
public void setReconnectSupported(boolean value) {
- this.reconnectSupported=value;
+ this.reconnectSupported = value;
}
-
+
public boolean isUpdateURIsSupported() {
return this.updateURIsSupported;
}
-
+
public void setUpdateURIsSupported(boolean value) {
- this.updateURIsSupported=value;
+ this.updateURIsSupported = value;
}
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
@@ -1041,8 +1061,7 @@ public class FailoverTransport implement
List<URI> add = new ArrayList<URI>();
if (updatedURIs != null && updatedURIs.length > 0) {
Set<URI> set = new HashSet<URI>();
- for (int i = 0; i < updatedURIs.length; i++) {
- URI uri = updatedURIs[i];
+ for (URI uri : updatedURIs) {
if (uri != null) {
set.add(uri);
}
@@ -1063,7 +1082,7 @@ public class FailoverTransport implement
}
}
}
-
+
/**
* @return the updateURIsURL
*/
@@ -1077,7 +1096,7 @@ public class FailoverTransport implement
public void setUpdateURIsURL(String updateURIsURL) {
this.updateURIsURL = updateURIsURL;
}
-
+
/**
* @return the rebalanceUpdateURIs
*/
@@ -1105,32 +1124,32 @@ public class FailoverTransport implement
stateTracker.connectionInterruptProcessingComplete(this, connectionId);
}
}
-
+
public ConnectionStateTracker getStateTracker() {
return stateTracker;
}
-
+
private boolean contains(URI newURI) {
boolean result = false;
try {
- for (URI uri:uris) {
- if (newURI.getPort()==uri.getPort()) {
- InetAddress newAddr = InetAddress.getByName(newURI.getHost());
- InetAddress addr = InetAddress.getByName(uri.getHost());
- if (addr.equals(newAddr)) {
- result = true;
- break;
+ for (URI uri : uris) {
+ if (newURI.getPort() == uri.getPort()) {
+ InetAddress newAddr = InetAddress.getByName(newURI.getHost());
+ InetAddress addr = InetAddress.getByName(uri.getHost());
+ if (addr.equals(newAddr)) {
+ result = true;
+ break;
+ }
}
}
- }
- }catch(IOException e) {
+ } catch (IOException e) {
result = true;
LOG.error("Failed to verify URI " + newURI + " already known: " + e);
}
return result;
}
-
+
private InputStreamReader getURLStream(String path) throws IOException {
InputStreamReader result = null;
URL url = null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java?rev=1177088&r1=1177087&r2=1177088&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java Wed Sep 28 22:07:21 2011
@@ -58,7 +58,7 @@ public class FailoverTransportFactory ex
* @throws IOException
*/
public Transport createTransport(CompositeData compositData) throws IOException {
- Map options = compositData.getParameters();
+ Map<String, String> options = compositData.getParameters();
FailoverTransport transport = createTransport(options);
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
@@ -67,7 +67,7 @@ public class FailoverTransportFactory ex
return transport;
}
- public FailoverTransport createTransport(Map parameters) throws IOException {
+ public FailoverTransport createTransport(Map<String, String> parameters) throws IOException {
FailoverTransport transport = new FailoverTransport();
IntrospectionSupport.setProperties(transport, parameters);
return transport;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java?rev=1177088&r1=1177087&r2=1177088&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerTest.java Wed Sep 28 22:07:21 2011
@@ -36,7 +36,6 @@ public class FailoverConsumerTest extend
public static final int MSG_COUNT = 100;
private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerTest.class);
-
public void testPublisherFailsOver() throws Exception {
// Uncomment this if you want to use remote broker created by
// NetworkTestSupport.
@@ -72,7 +71,7 @@ public class FailoverConsumerTest extend
// though).
// So we must use external broker ant restart it manually.
LOG.info("You should restart remote broker now and press enter!");
- System.in.read();
+ //System.in.read();
// Thread.sleep(20000);
restartRemoteBroker();
msg.acknowledge();
@@ -114,6 +113,6 @@ public class FailoverConsumerTest extend
}
protected String getRemoteURI() {
- return "tcp://localhost:55555";
+ return "tcp://localhost:61616";
}
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java?rev=1177088&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java Wed Sep 28 22:07:21 2011
@@ -0,0 +1,110 @@
+package org.apache.activemq.transport.failover;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertTrue;
+
+public class InitalReconnectDelayTest {
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
+ protected BrokerService broker1;
+ protected BrokerService broker2;
+ protected CountDownLatch broker2Started = new CountDownLatch(1);
+ protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false&initialReconnectDelay=15000";
+
+ @Test
+ public void testInitialReconnectDelay() throws Exception {
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue("foo");
+ MessageProducer producer = session.createProducer(destination);
+
+ long start = (new Date()).getTime();
+ producer.send(session.createTextMessage("TEST"));
+ long end = (new Date()).getTime();
+
+ //Verify we can send quickly
+ assertTrue((end - start) < 2000);
+
+ //Halt the broker1...
+ LOG.info("Stopping the Broker1...");
+ broker1.stop();
+
+ LOG.info("Attempting to send... failover should kick in...");
+ start = (new Date()).getTime();
+ producer.send(session.createTextMessage("TEST"));
+ end = (new Date()).getTime();
+
+ //Inital reconnection should kick in and be darned close to what we expected
+ LOG.info("Failover took " + (end - start) + " ms.");
+ assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ final String dataDir = "target/data/shared";
+
+ broker1 = new BrokerService();
+
+ broker1.setBrokerName("broker1");
+ broker1.setDeleteAllMessagesOnStartup(true);
+ broker1.setDataDirectory(dataDir);
+ broker1.addConnector("tcp://localhost:62001");
+ broker1.setUseJmx(false);
+ broker1.start();
+ broker1.waitUntilStarted();
+
+ broker2 = new BrokerService();
+ broker2.setBrokerName("broker2");
+ broker2.setDataDirectory(dataDir);
+ broker2.setUseJmx(false);
+ broker2.addConnector("tcp://localhost:62002");
+ broker2.start();
+ broker2.waitUntilStarted();
+
+ }
+
+ protected String getSlaveXml() {
+ return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
+ }
+
+ protected String getMasterXml() {
+ return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ if (broker1.isStarted()) {
+ broker1.stop();
+ broker1.waitUntilStopped();
+ }
+
+ if (broker2.isStarted()) {
+ broker2.stop();
+ broker2.waitUntilStopped();
+ }
+ }
+
+ protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ return new ActiveMQConnectionFactory(uriString);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
------------------------------------------------------------------------------
svn:eol-style = native