You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/08/08 13:56:37 UTC

qpid-jms git commit: QPIDJMS-407 Address thread unsafe failover requests handling

Repository: qpid-jms
Updated Branches:
  refs/heads/master e2e0cee97 -> fe90adaf2


QPIDJMS-407 Address thread unsafe failover requests handling

Address thread unsafe handling of in progress requests being added and
removed from the request tracking map that lead to issues on reconnect
and recover losing track of pending requests.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/fe90adaf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/fe90adaf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/fe90adaf

Branch: refs/heads/master
Commit: fe90adaf2f63444c09ddb6b058f03568cacb04ef
Parents: e2e0cee
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Aug 8 09:56:21 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Aug 8 09:56:21 2018 -0400

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java | 49 ++++++++++++++------
 1 file changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe90adaf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 654ec48..ad4752c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -99,7 +100,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     private final AtomicBoolean failed = new AtomicBoolean();
     private final AtomicBoolean closingConnection = new AtomicBoolean(false);
     private final AtomicLong requestId = new AtomicLong();
-    private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>();
+    private final Map<Long, FailoverRequest> requests = Collections.synchronizedMap(new LinkedHashMap<Long, FailoverRequest>());
     private final DefaultProviderListener closedListener = new DefaultProviderListener();
     private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>();
     private final ProviderFutureFactory futureFactory;
@@ -178,9 +179,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                 public void run() {
                     try {
                         IOException error = failureCause != null ? failureCause : new IOException("Connection closed");
-                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        final List<FailoverRequest> pending;
+                        synchronized (requests) {
+                            pending = new ArrayList<FailoverRequest>(requests.values());
+                        }
                         for (FailoverRequest request : pending) {
-                            request.onFailure(error);
+                            if (!request.isComplete()) {
+                                request.onFailure(error);
+                            }
                         }
 
                         if (requestTimeoutTask != null) {
@@ -558,11 +564,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                 if (listener != null) {
                     listener.onConnectionInterrupted(failedURI);
                 }
-                
-                if (!requests.isEmpty()) {
-                	for (FailoverRequest request : requests.values()) {
-                		request.whenOffline(cause);
-                	}
+
+                final List<FailoverRequest> pending;
+                synchronized (requests) {
+                    pending = new ArrayList<FailoverRequest>(requests.values());
+                }
+                for (FailoverRequest request : pending) {
+                    request.whenOffline(cause);
                 }
 
                 // Start watching for request timeouts while we are offline, unless we already are.
@@ -626,9 +634,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                         listener.onConnectionRestored(provider.getRemoteURI());
 
                         // Last step: Send pending actions.
-                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        final List<FailoverRequest> pending;
+                        synchronized (requests) {
+                            pending = new ArrayList<FailoverRequest>(requests.values());
+                        }
                         for (FailoverRequest request : pending) {
-                            request.run();
+                            if (!request.isComplete()) {
+                                request.run();
+                            }
                         }
 
                         reconnectControl.connectionEstablished();
@@ -636,9 +649,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                         processAlternates(provider.getAlternateURIs());
 
                         // Last step: Send pending actions.
-                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        final List<FailoverRequest> pending;
+                        synchronized (requests) {
+                            pending = new ArrayList<FailoverRequest>(requests.values());
+                        }
                         for (FailoverRequest request : pending) {
-                            request.run();
+                            if (!request.isComplete()) {
+                                request.run();
+                            }
                         }
                     }
 
@@ -1084,8 +1102,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
         @Override
         public void run() {
-            List<FailoverRequest> copied = new ArrayList<FailoverRequest>(requests.values());
-            for (FailoverRequest request : copied) {
+            final List<FailoverRequest> pending;
+            synchronized (requests) {
+                pending = new ArrayList<FailoverRequest>(requests.values());
+            }
+            for (FailoverRequest request : pending) {
                 if (request.isExpired()) {
                     LOG.trace("Task {} has timed out, sending failure notice.", request);
                     request.onFailure(request.createTimedOutException());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org