You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/08/08 13:56:37 UTC
qpid-jms git commit: QPIDJMS-407 Address thread unsafe failover
requests handling
Repository: qpid-jms
Updated Branches:
refs/heads/master e2e0cee97 -> fe90adaf2
QPIDJMS-407 Address thread unsafe failover requests handling
Address thread unsafe handling of in progress requests being added and
removed from the request tracking map that lead to issues on reconnect
and recover losing track of pending requests.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/fe90adaf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/fe90adaf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/fe90adaf
Branch: refs/heads/master
Commit: fe90adaf2f63444c09ddb6b058f03568cacb04ef
Parents: e2e0cee
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Aug 8 09:56:21 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Aug 8 09:56:21 2018 -0400
----------------------------------------------------------------------
.../jms/provider/failover/FailoverProvider.java | 49 ++++++++++++++------
1 file changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/fe90adaf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 654ec48..ad4752c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.LinkedHashMap;
import java.util.List;
@@ -99,7 +100,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
private final AtomicBoolean failed = new AtomicBoolean();
private final AtomicBoolean closingConnection = new AtomicBoolean(false);
private final AtomicLong requestId = new AtomicLong();
- private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>();
+ private final Map<Long, FailoverRequest> requests = Collections.synchronizedMap(new LinkedHashMap<Long, FailoverRequest>());
private final DefaultProviderListener closedListener = new DefaultProviderListener();
private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>();
private final ProviderFutureFactory futureFactory;
@@ -178,9 +179,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
public void run() {
try {
IOException error = failureCause != null ? failureCause : new IOException("Connection closed");
- List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+ final List<FailoverRequest> pending;
+ synchronized (requests) {
+ pending = new ArrayList<FailoverRequest>(requests.values());
+ }
for (FailoverRequest request : pending) {
- request.onFailure(error);
+ if (!request.isComplete()) {
+ request.onFailure(error);
+ }
}
if (requestTimeoutTask != null) {
@@ -558,11 +564,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
if (listener != null) {
listener.onConnectionInterrupted(failedURI);
}
-
- if (!requests.isEmpty()) {
- for (FailoverRequest request : requests.values()) {
- request.whenOffline(cause);
- }
+
+ final List<FailoverRequest> pending;
+ synchronized (requests) {
+ pending = new ArrayList<FailoverRequest>(requests.values());
+ }
+ for (FailoverRequest request : pending) {
+ request.whenOffline(cause);
}
// Start watching for request timeouts while we are offline, unless we already are.
@@ -626,9 +634,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
listener.onConnectionRestored(provider.getRemoteURI());
// Last step: Send pending actions.
- List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+ final List<FailoverRequest> pending;
+ synchronized (requests) {
+ pending = new ArrayList<FailoverRequest>(requests.values());
+ }
for (FailoverRequest request : pending) {
- request.run();
+ if (!request.isComplete()) {
+ request.run();
+ }
}
reconnectControl.connectionEstablished();
@@ -636,9 +649,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
processAlternates(provider.getAlternateURIs());
// Last step: Send pending actions.
- List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+ final List<FailoverRequest> pending;
+ synchronized (requests) {
+ pending = new ArrayList<FailoverRequest>(requests.values());
+ }
for (FailoverRequest request : pending) {
- request.run();
+ if (!request.isComplete()) {
+ request.run();
+ }
}
}
@@ -1084,8 +1102,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
@Override
public void run() {
- List<FailoverRequest> copied = new ArrayList<FailoverRequest>(requests.values());
- for (FailoverRequest request : copied) {
+ final List<FailoverRequest> pending;
+ synchronized (requests) {
+ pending = new ArrayList<FailoverRequest>(requests.values());
+ }
+ for (FailoverRequest request : pending) {
if (request.isExpired()) {
LOG.trace("Task {} has timed out, sending failure notice.", request);
request.onFailure(request.createTimedOutException());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org