You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/04/18 15:46:03 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6248

Repository: activemq
Updated Branches:
  refs/heads/master 11622b3af -> 23a5beb86


https://issues.apache.org/jira/browse/AMQ-6248

Prevent conccurent calls to handleTransportFailure from closing an
already reconnected transport instance.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/23a5beb8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/23a5beb8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/23a5beb8

Branch: refs/heads/master
Commit: 23a5beb86c3dac151ae3ed242ce506aa0048bd03
Parents: 11622b3
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Apr 18 09:45:45 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Apr 18 09:45:45 2016 -0400

----------------------------------------------------------------------
 .../transport/failover/FailoverTransport.java   | 195 ++++++++-----------
 1 file changed, 86 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/23a5beb8/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 553036b..c0bc9da 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -111,9 +111,7 @@ public class FailoverTransport implements CompositeTransport {
     private boolean trackMessages = false;
     private boolean trackTransactionProducers = true;
     private int maxCacheSize = 128 * 1024;
-    private final TransportListener disposedListener = new DefaultTransportListener() {
-    };
-    private final TransportListener myTransportListener = createTransportListener();
+    private final TransportListener disposedListener = new DefaultTransportListener() {};
     private boolean updateURIsSupported = true;
     private boolean reconnectSupported = true;
     // remember for reconnect thread
@@ -180,61 +178,61 @@ public class FailoverTransport implements CompositeTransport {
         }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
     }
 
-    TransportListener createTransportListener() {
+    private void processCommand(Object incoming) {
+        Command command = (Command) incoming;
+        if (command == null) {
+            return;
+        }
+        if (command.isResponse()) {
+            Object object = null;
+            synchronized (requestMap) {
+                object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
+            }
+            if (object != null && object.getClass() == Tracked.class) {
+                ((Tracked) object).onResponses(command);
+            }
+        }
+
+        if (command.isConnectionControl()) {
+            handleConnectionControl((ConnectionControl) command);
+        } else if (command.isConsumerControl()) {
+            ConsumerControl consumerControl = (ConsumerControl)command;
+            if (consumerControl.isClose()) {
+                stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
+            }
+        }
+
+        if (transportListener != null) {
+            transportListener.onCommand(command);
+        }
+    }
+
+    private TransportListener createTransportListener(final Transport owner) {
         return new TransportListener() {
+
             @Override
             public void onCommand(Object o) {
-                Command command = (Command) o;
-                if (command == null) {
-                    return;
-                }
-                if (command.isResponse()) {
-                    Object object = null;
-                    synchronized (requestMap) {
-                        object = requestMap.remove(Integer.valueOf(((Response) command).getCorrelationId()));
-                    }
-                    if (object != null && object.getClass() == Tracked.class) {
-                        ((Tracked) object).onResponses(command);
-                    }
-                }
-
-                if (command.isConnectionControl()) {
-                    handleConnectionControl((ConnectionControl) command);
-                }
-                else if (command.isConsumerControl()) {
-                    ConsumerControl consumerControl = (ConsumerControl)command;
-                    if (consumerControl.isClose()) {
-                        stateTracker.processRemoveConsumer(consumerControl.getConsumerId(), RemoveInfo.LAST_DELIVERED_UNKNOWN);
-                    }
-
-                }
-                if (transportListener != null) {
-                    transportListener.onCommand(command);
-                }
+                processCommand(o);
             }
 
             @Override
             public void onException(IOException error) {
                 try {
-                    handleTransportFailure(error);
+                    handleTransportFailure(owner, error);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
-                    transportListener.onException(new InterruptedIOException());
+                    if (transportListener != null) {
+                        transportListener.onException(new InterruptedIOException());
+                    }
                 }
             }
 
             @Override
             public void transportInterupted() {
-                if (transportListener != null) {
-                    transportListener.transportInterupted();
-                }
             }
 
             @Override
             public void transportResumed() {
-                if (transportListener != null) {
-                    transportListener.transportResumed();
-                }
             }
         };
     }
@@ -245,6 +243,10 @@ public class FailoverTransport implements CompositeTransport {
     }
 
     public final void handleTransportFailure(IOException e) throws InterruptedException {
+        handleTransportFailure(getConnectedTransport(), e);
+    }
+
+    public final void handleTransportFailure(Transport failed, IOException e) throws InterruptedException {
         if (shuttingDown) {
             // shutdown info sent and remote socket closed and we see that before a local close
             // let the close do the work
@@ -256,21 +258,25 @@ public class FailoverTransport implements CompositeTransport {
         }
 
         // could be blocked in write with the reconnectMutex held, but still needs to be whacked
-        Transport transport = connectedTransport.getAndSet(null);
-        if (transport != null) {
-            disposeTransport(transport);
+        Transport transport = null;
+
+        if (connectedTransport.compareAndSet(failed, null)) {
+            transport = failed;
+            if (transport != null) {
+                disposeTransport(transport);
+            }
         }
 
         synchronized (reconnectMutex) {
             if (transport != null && connectedTransport.get() == null) {
-
                 boolean reconnectOk = false;
 
                 if (canReconnect()) {
                     reconnectOk = true;
                 }
-                LOG.warn("Transport (" + connectedTransportURI + ") failed"
-                        + (reconnectOk ? "," : ", not") + " attempting to automatically reconnect", e);
+
+                LOG.warn("Transport ({}) failed {} attempting to automatically reconnect: {}",
+                         connectedTransport, (reconnectOk ? "," : ", not"), e);
 
                 failedConnectTransportURI = connectedTransportURI;
                 connectedTransportURI = null;
@@ -347,9 +353,7 @@ public class FailoverTransport implements CompositeTransport {
     @Override
     public void start() throws Exception {
         synchronized (reconnectMutex) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Started " + this);
-            }
+            LOG.debug("Started {}", this);
             if (started) {
                 return;
             }
@@ -373,7 +377,7 @@ public class FailoverTransport implements CompositeTransport {
         try {
             synchronized (reconnectMutex) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Stopped " + this);
+                    LOG.debug("Stopped {}", this);
                 }
                 if (!started) {
                     return;
@@ -407,9 +411,7 @@ public class FailoverTransport implements CompositeTransport {
         }
         for (Transport transport : backupsToStop) {
             try {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Stopped backup: " + transport);
-                }
+                LOG.trace("Stopped backup: {}", transport);
                 disposeTransport(transport);
             } catch (Exception e) {
             }
@@ -581,7 +583,7 @@ public class FailoverTransport implements CompositeTransport {
                         if (command.isResponseRequired()) {
                             Response response = new Response();
                             response.setCorrelationId(command.getCommandId());
-                            myTransportListener.onCommand(response);
+                            processCommand(response);
                         }
                         return;
                     } else if (command instanceof MessagePull) {
@@ -591,7 +593,7 @@ public class FailoverTransport implements CompositeTransport {
                             MessageDispatch dispatch = new MessageDispatch();
                             dispatch.setConsumerId(pullRequest.getConsumerId());
                             dispatch.setDestination(pullRequest.getDestination());
-                            myTransportListener.onCommand(dispatch);
+                            processCommand(dispatch);
                         }
                         return;
                     }
@@ -607,24 +609,19 @@ public class FailoverTransport implements CompositeTransport {
                         boolean timedout = false;
                         while (transport == null && !disposed && connectionFailure == null
                                 && !Thread.currentThread().isInterrupted() && willReconnect()) {
-                            if (LOG.isTraceEnabled()) {
-                                LOG.trace("Waiting for transport to reconnect..: " + command);
-                            }
+
+                            LOG.trace("Waiting for transport to reconnect..: {}", command);
                             long end = System.currentTimeMillis();
                             if (command.isMessage() && timeout > 0 && (end - start > timeout)) {
                                 timedout = true;
-                                if (LOG.isInfoEnabled()) {
-                                    LOG.info("Failover timed out after " + (end - start) + "ms");
-                                }
+                                LOG.info("Failover timed out after {} ms", (end - start));
                                 break;
                             }
                             try {
                                 reconnectMutex.wait(100);
                             } catch (InterruptedException e) {
                                 Thread.currentThread().interrupt();
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Interupted: " + e, e);
-                                }
+                                LOG.debug("Interupted:", e);
                             }
                             transport = connectedTransport.get();
                         }
@@ -650,7 +647,7 @@ public class FailoverTransport implements CompositeTransport {
                         try {
                             tracked = stateTracker.track(command);
                         } catch (IOException ioe) {
-                            LOG.debug("Cannot track the command " + command, ioe);
+                            LOG.debug("Cannot track the command {} {}", command, ioe);
                         }
                         // If it was a request and it was not being tracked by
                         // the state tracker,
@@ -691,19 +688,14 @@ public class FailoverTransport implements CompositeTransport {
                             } else {
                                 // Handle the error but allow the method to return since the
                                 // tracked commands are replayed on reconnect.
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
-                                }
+                                LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
                                 handleTransportFailure(e);
                             }
                         }
 
                         return;
-
                     } catch (IOException e) {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug("Send oneway attempt: " + i + " failed for command:" + command);
-                        }
+                        LOG.debug("Send oneway attempt: {} failed for command: {}", i, command);
                         handleTransportFailure(e);
                     }
                 }
@@ -774,7 +766,7 @@ public class FailoverTransport implements CompositeTransport {
             }
 
         } catch (Exception e) {
-            LOG.error("Failed to parse URI: " + u);
+            LOG.error("Failed to parse URI: {}", u);
         }
     }
 
@@ -818,9 +810,9 @@ public class FailoverTransport implements CompositeTransport {
         if (removed) {
             l.add(failedConnectTransportURI);
         }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("urlList connectionList:" + l + ", from: " + uris);
-        }
+
+        LOG.debug("urlList connectionList:{}, from: {}", l, uris);
+
         return l;
     }
 
@@ -861,9 +853,7 @@ public class FailoverTransport implements CompositeTransport {
             tmpMap = new LinkedHashMap<Integer, Command>(requestMap);
         }
         for (Command command : tmpMap.values()) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("restore requestMap, replay: " + command);
-            }
+            LOG.trace("restore requestMap, replay: {}", command);
             t.oneway(command);
         }
     }
@@ -916,7 +906,7 @@ public class FailoverTransport implements CompositeTransport {
                 }
                 newUris = buffer.toString();
             } catch (IOException ioe) {
-                LOG.error("Failed to read updateURIsURL: " + fileURL, ioe);
+                LOG.error("Failed to read updateURIsURL: {} {}",fileURL, ioe);
             } finally {
                 if (in != null) {
                     try {
@@ -954,9 +944,7 @@ public class FailoverTransport implements CompositeTransport {
                             doRebalance = false;
                             return false;
                         } else {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
-                            }
+                            LOG.debug("Doing rebalance from: {} to {}", connectedTransportURI, connectList);
 
                             try {
                                 Transport transport = this.connectedTransport.getAndSet(null);
@@ -964,9 +952,7 @@ public class FailoverTransport implements CompositeTransport {
                                     disposeTransport(transport);
                                 }
                             } catch (Exception e) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Caught an exception stopping existing transport for rebalance", e);
-                                }
+                                LOG.debug("Caught an exception stopping existing transport for rebalance", e);
                             }
                         }
                         doRebalance = false;
@@ -988,7 +974,7 @@ public class FailoverTransport implements CompositeTransport {
                             backups.remove(bt);
                             transport = bt.getTransport();
                             uri = bt.getUri();
-                            myTransportListener.onCommand(bt.getBrokerInfo());
+                            processCommand(bt.getBrokerInfo());
                             if (priorityBackup && priorityBackupAvailable) {
                                 Transport old = this.connectedTransport.getAndSet(null);
                                 if (old != null) {
@@ -1023,19 +1009,17 @@ public class FailoverTransport implements CompositeTransport {
                                 transport = TransportFactory.compositeConnect(uri);
                             }
 
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Attempting  " + connectFailures + "th  connect to: " + uri);
-                            }
-                            transport.setTransportListener(myTransportListener);
+                            LOG.debug("Attempting {}th connect to: {}", connectFailures, uri);
+
+                            transport.setTransportListener(createTransportListener(transport));
                             transport.start();
 
-                            if (started &&  !firstConnection) {
+                            if (started && !firstConnection) {
                                 restoreTransport(transport);
                             }
 
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Connection established");
-                            }
+                            LOG.debug("Connection established");
+
                             reconnectDelay = initialReconnectDelay;
                             connectedTransportURI = uri;
                             connectedTransport.set(transport);
@@ -1058,33 +1042,26 @@ public class FailoverTransport implements CompositeTransport {
                             if (transportListener != null) {
                                 transportListener.transportResumed();
                             } else {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("transport resumed by transport listener not set");
-                                }
+                                LOG.debug("transport resumed by transport listener not set");
                             }
 
                             if (firstConnection) {
                                 firstConnection = false;
-                                LOG.info("Successfully connected to " + uri);
+                                LOG.info("Successfully connected to {}", uri);
                             } else {
-                                LOG.info("Successfully reconnected to " + uri);
+                                LOG.info("Successfully reconnected to {}", uri);
                             }
 
                             return false;
                         } catch (Exception e) {
                             failure = e;
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
-                            }
+                            LOG.debug("Connect fail to: {}, reason: {}", uri, e);
                             if (transport != null) {
                                 try {
                                     transport.stop();
                                     transport = null;
                                 } catch (Exception ee) {
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.debug("Stop of failed transport: " + transport +
-                                                  " failed with reason: " + ee);
-                                    }
+                                    LOG.debug("Stop of failed transport: {} failed with reason: {}", transport, ee);
                                 }
                             }
                         } finally {
@@ -1098,7 +1075,7 @@ public class FailoverTransport implements CompositeTransport {
 
             connectFailures++;
             if (reconnectLimit != INFINITE && connectFailures >= reconnectLimit) {
-                LOG.error("Failed to connect to " + uris + " after: " + connectFailures + " attempt(s)");
+                LOG.error("Failed to connect to {} after: {} attempt(s)", uris, connectFailures);
                 connectionFailure = failure;
 
                 // Make sure on initial startup, that the transportListener has been
@@ -1406,9 +1383,9 @@ public class FailoverTransport implements CompositeTransport {
             } catch(IOException e) {
 
                 if (firstAddr == null) {
-                    LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
+                    LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", first, e);
                 } else {
-                    LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
+                    LOG.error("Failed to Lookup INetAddress for URI[{}] : {}", second, e);
                 }
 
                 if (first.getHost().equalsIgnoreCase(second.getHost())) {