You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/04/18 15:46:03 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6248
Repository: activemq
Updated Branches:
refs/heads/master 11622b3af -> 23a5beb86
https://issues.apache.org/jira/browse/AMQ-6248
Prevent conccurent calls to handleTransportFailure from closing an
already reconnected transport instance.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/23a5beb8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/23a5beb8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/23a5beb8
Branch: refs/heads/master
Commit: 23a5beb86c3dac151ae3ed242ce506aa0048bd03
Parents: 11622b3
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Apr 18 09:45:45 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Apr 18 09:45:45 2016 -0400
----------------------------------------------------------------------
.../transport/failover/FailoverTransport.java | 195 ++++++++-----------
1 file changed, 86 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/23a5beb8/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 553036b..c0bc9da 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -111,9 +111,7 @@ public class FailoverTransport implements CompositeTransport {
private boolean trackMessages = false;
private boolean trackTransactionProducers = true;
private int maxCacheSize = 128 * 1024;
- private final TransportListener disposedListener = new DefaultTransportListener() {
- };
- private final TransportListener myTransportListener = createTransportListener();
+ private final TransportListener disposedListener = new DefaultTransportListener() {};
private boolean updateURIsSupported = true;
private boolean reconnectSupported = true;
// remember for reconnect thread
@@ -180,61 +178,61 @@ public class FailoverTransport implements CompositeTransport {
}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
}
- TransportListener createTransportListener() {
+ private void processCommand(Object incoming) {
+ Command command = (Command) incoming;
+ if (command == null) {
+ return;
+ }
+ if (command.isResponse()) {
+ Object object = null;
+ synchronized (requestMap) {
+ object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
+ }
+ if (object != null && object.getClass() == Tracked.class) {
+ ((Tracked) object).onResponses(command);
+ }
+ }
+
+ if (command.isConnectionControl()) {
+ handleConnectionControl((ConnectionControl) command);
+ } else if (command.isConsumerControl()) {
+ ConsumerControl consumerControl = (ConsumerControl)command;
+ if (consumerControl.isClose()) {
+ stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
+ }
+ }
+
+ if (transportListener != null) {
+ transportListener.onCommand(command);
+ }
+ }
+
+ private TransportListener createTransportListener(final Transport owner) {
return new TransportListener() {
+
@Override
public void onCommand(Object o) {
- Command command = (Command) o;
- if (command == null) {
- return;
- }
- if (command.isResponse()) {
- Object object = null;
- synchronized (requestMap) {
- object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
- }
- if (object != null && object.getClass() == Tracked.class) {
- ((Tracked) object).onResponses(command);
- }
- }
-
- if (command.isConnectionControl()) {
- handleConnectionControl((ConnectionControl) command);
- }
- else if (command.isConsumerControl()) {
- ConsumerControl consumerControl = (ConsumerControl)command;
- if (consumerControl.isClose()) {
- stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
- }
-
- }
- if (transportListener != null) {
- transportListener.onCommand(command);
- }
+ processCommand(o);
}
@Override
public void onException(IOException error) {
try {
- handleTransportFailure(error);
+ handleTransportFailure(owner, error);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- transportListener.onException(new InterruptedIOException());
+ if (transportListener != null) {
+ transportListener.onException(new InterruptedIOException());
+ }
}
}
@Override
public void transportInterupted() {
- if (transportListener != null) {
- transportListener.transportInterupted();
- }
}
@Override
public void transportResumed() {
- if (transportListener != null) {
- transportListener.transportResumed();
- }
}
};
}
@@ -245,6 +243,10 @@ public class FailoverTransport implements CompositeTransport {
}
public final void handleTransportFailure(IOException e) throws InterruptedException {
+ handleTransportFailure(getConnectedTransport(), e);
+ }
+
+ public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {
if (shuttingDown) {
// shutdown info sent and remote socket closed and we see that before a local close
// let the close do the work
@@ -256,21 +258,25 @@ public class FailoverTransport implements CompositeTransport {
}
// could be blocked in write with the reconnectMutex held, but still needs to be whacked
- Transport transport = connectedTransport.getAndSet(null);
- if (transport != null) {
- disposeTransport(transport);
+ Transport transport = null;
+
+ if (connectedTransport.compareAndSet(failed, null)) {
+ transport = failed;
+ if (transport != null) {
+ disposeTransport(transport);
+ }
}
synchronized (reconnectMutex) {
if (transport != null && connectedTransport.get() == null) {
-
boolean reconnectOk = false;
if (canReconnect()) {
reconnectOk = true;
}
- LOG.warn("Transport (" + connectedTransportURI + ") failed"
- + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
+
+ LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}",
+ connectedTransport, (reconnectOk ? "," : ", not"), e);
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
@@ -347,9 +353,7 @@ public class FailoverTransport implements CompositeTransport {
@Override
public void start() throws Exception {
synchronized (reconnectMutex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Started " + this);
- }
+ LOG.debug("Started {}", this);
if (started) {
return;
}
@@ -373,7 +377,7 @@ public class FailoverTransport implements CompositeTransport {
try {
synchronized (reconnectMutex) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Stopped " + this);
+ LOG.debug("Stopped {}", this);
}
if (!started) {
return;
@@ -407,9 +411,7 @@ public class FailoverTransport implements CompositeTransport {
}
for (Transport transport : backupsToStop) {
try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Stopped backup: " + transport);
- }
+ LOG.trace("Stopped backup: {}", transport);
disposeTransport(transport);
} catch (Exception e) {
}
@@ -581,7 +583,7 @@ public class FailoverTransport implements CompositeTransport {
if (command.isResponseRequired()) {
Response response = new Response();
response.setCorrelationId(command.getCommandId());
- myTransportListener.onCommand(response);
+ processCommand(response);
}
return;
} else if (command instanceof MessagePull) {
@@ -591,7 +593,7 @@ public class FailoverTransport implements CompositeTransport {
MessageDispatch dispatch = new MessageDispatch();
dispatch.setConsumerId(pullRequest.getConsumerId());
dispatch.setDestination(pullRequest.getDestination());
- myTransportListener.onCommand(dispatch);
+ processCommand(dispatch);
}
return;
}
@@ -607,24 +609,19 @@ public class FailoverTransport implements CompositeTransport {
boolean timedout = false;
while (transport == null && !disposed && connectionFailure == null
&& !Thread.currentThread().isInterrupted() && willReconnect()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting for transport to reconnect..: " + command);
- }
+
+ LOG.trace("Waiting for transport to reconnect..: {}", command);
long end = System.currentTimeMillis();
if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
timedout = true;
- if (LOG.isInfoEnabled()) {
- LOG.info("Failover timed out after " + (end - start) + "ms");
- }
+ LOG.info("Failover timed out after {} ms", (end - start));
break;
}
try {
reconnectMutex.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Interupted: " + e, e);
- }
+ LOG.debug("Interupted:", e);
}
transport = connectedTransport.get();
}
@@ -650,7 +647,7 @@ public class FailoverTransport implements CompositeTransport {
try {
tracked = stateTracker.track(command);
} catch (IOException ioe) {
- LOG.debug("Cannot track the command " + command, ioe);
+ LOG.debug("Cannot track the command {} {}", command, ioe);
}
// If it was a request and it was not being tracked by
// the state tracker,
@@ -691,19 +688,14 @@ public class FailoverTransport implements CompositeTransport {
} else {
// Handle the error but allow the method to return since the
// tracked commands are replayed on reconnect.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
- }
+ LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
handleTransportFailure(e);
}
}
return;
-
} catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
- }
+ LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
handleTransportFailure(e);
}
}
@@ -774,7 +766,7 @@ public class FailoverTransport implements CompositeTransport {
}
} catch (Exception e) {
- LOG.error("Failed to parse URI: " + u);
+ LOG.error("Failed to parse URI: {}", u);
}
}
@@ -818,9 +810,9 @@ public class FailoverTransport implements CompositeTransport {
if (removed) {
l.add(failedConnectTransportURI);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("urlList connectionList:" + l + ", from: " + uris);
- }
+
+ LOG.debug("urlList connectionList:{}, from: {}", l, uris);
+
return l;
}
@@ -861,9 +853,7 @@ public class FailoverTransport implements CompositeTransport {
tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
}
for (Command command : tmpMap.values()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("restore requestMap, replay: " + command);
- }
+ LOG.trace("restore requestMap, replay: {}", command);
t.oneway(command);
}
}
@@ -916,7 +906,7 @@ public class FailoverTransport implements CompositeTransport {
}
newUris = buffer.toString();
} catch (IOException ioe) {
- LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
+ LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe);
} finally {
if (in != null) {
try {
@@ -954,9 +944,7 @@ public class FailoverTransport implements CompositeTransport {
doRebalance = false;
return false;
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
- }
+ LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList);
try {
Transport transport = this.connectedTransport.getAndSet(null);
@@ -964,9 +952,7 @@ public class FailoverTransport implements CompositeTransport {
disposeTransport(transport);
}
} catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Caught an exception stopping existing transport for rebalance", e);
- }
+ LOG.debug("Caught an exception stopping existing transport for rebalance", e);
}
}
doRebalance = false;
@@ -988,7 +974,7 @@ public class FailoverTransport implements CompositeTransport {
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
- myTransportListener.onCommand(bt.getBrokerInfo());
+ processCommand(bt.getBrokerInfo());
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (old != null) {
@@ -1023,19 +1009,17 @@ public class FailoverTransport implements CompositeTransport {
transport = TransportFactory.compositeConnect(uri);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Attempting " + connectFailures + "th connect to: " + uri);
- }
- transport.setTransportListener(myTransportListener);
+ LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);
+
+ transport.setTransportListener(createTransportListener(transport));
transport.start();
- if (started && !firstConnection) {
+ if (started && !firstConnection) {
restoreTransport(transport);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connection established");
- }
+ LOG.debug("Connection established");
+
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
@@ -1058,33 +1042,26 @@ public class FailoverTransport implements CompositeTransport {
if (transportListener != null) {
transportListener.transportResumed();
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("transport resumed by transport listener not set");
- }
+ LOG.debug("transport resumed by transport listener not set");
}
if (firstConnection) {
firstConnection = false;
- LOG.info("Successfully connected to " + uri);
+ LOG.info("Successfully connected to {}", uri);
} else {
- LOG.info("Successfully reconnected to " + uri);
+ LOG.info("Successfully reconnected to {}", uri);
}
return false;
} catch (Exception e) {
failure = e;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connect fail to: " + uri + ", reason: " + e);
- }
+ LOG.debug("Connect fail to: {}, reason: {}", uri, e);
if (transport != null) {
try {
transport.stop();
transport = null;
} catch (Exception ee) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Stop of failed transport: " + transport +
- " failed with reason: " + ee);
- }
+ LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
}
}
} finally {
@@ -1098,7 +1075,7 @@ public class FailoverTransport implements CompositeTransport {
connectFailures++;
if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
- LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
+ LOG.error("Failed to connect to {} after: {} attempt(s)", uris, connectFailures);
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been
@@ -1406,9 +1383,9 @@ public class FailoverTransport implements CompositeTransport {
} catch(IOException e) {
if (firstAddr == null) {
- LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
+ LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e);
} else {
- LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
+ LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e);
}
if (first.getHost().equalsIgnoreCase(second.getHost())) {