You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/12 10:46:25 UTC
qpid-jms-amqp-0-x git commit: QPID-7774: Improve locking when using
failover latch
Repository: qpid-jms-amqp-0-x
Updated Branches:
refs/heads/master 948c341da -> ce8996aeb
QPID-7774: Improve locking when using failover latch
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/ce8996ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/ce8996ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/ce8996ae
Branch: refs/heads/master
Commit: ce8996aebe14f1b6c9d02ec123e8ccda8fd8988b
Parents: 948c341
Author: Alex Rudyy <or...@apache.org>
Authored: Fri May 12 10:17:55 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri May 12 10:17:55 2017 +0100
----------------------------------------------------------------------
.../org/apache/qpid/client/AMQConnection.java | 1 -
.../qpid/client/AMQConnectionDelegate_0_10.java | 76 ++++++++++----------
.../apache/qpid/client/AMQProtocolHandler.java | 25 +++----
3 files changed, 48 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ce8996ae/client/src/main/java/org/apache/qpid/client/AMQConnection.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5ffb11e..c629414 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -43,7 +43,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ce8996ae/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index b58ff1e..ba3e4ae 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -331,53 +331,53 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
final ConnectionClose close = exc.getClose();
if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
{
- _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-
- _qpidConnection.notifyFailoverRequired();
-
+ final CountDownLatch failoverLatch = new CountDownLatch(1);
+ _conn.getProtocolHandler().setFailoverLatch(failoverLatch);
final AtomicBoolean failoverDone = new AtomicBoolean();
-
- _conn.doWithAllLocks(new Runnable()
+ try
{
- @Override
- public void run()
+ _qpidConnection.notifyFailoverRequired();
+ _conn.doWithAllLocks(new Runnable()
{
- try
+ @Override
+ public void run()
{
- boolean preFailover = _conn.firePreFailover(false);
- if (preFailover)
+ try
{
- boolean reconnected;
- if(exc instanceof RedirectConnectionException)
+ boolean preFailover = _conn.firePreFailover(false);
+ if (preFailover)
{
- RedirectConnectionException redirect = (RedirectConnectionException)exc;
- reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
- }
- else
- {
- reconnected = _conn.attemptReconnection();
- }
- if(reconnected)
- {
- failoverPrep();
- _conn.resubscribeSessions();
- _conn.fireFailoverComplete();
- failoverDone.set(true);
+ boolean reconnected;
+ if (exc instanceof RedirectConnectionException)
+ {
+ RedirectConnectionException redirect = (RedirectConnectionException) exc;
+ reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
+ }
+ else
+ {
+ reconnected = _conn.attemptReconnection();
+ }
+ if (reconnected)
+ {
+ failoverPrep();
+ _conn.resubscribeSessions();
+ _conn.fireFailoverComplete();
+ failoverDone.set(true);
+ }
}
}
+ catch (Exception e)
+ {
+ _logger.error("error during failover", e);
+ }
}
- catch (Exception e)
- {
- _logger.error("error during failover", e);
- }
- finally
- {
- _conn.getProtocolHandler().getFailoverLatch().countDown();
- _conn.getProtocolHandler().setFailoverLatch(null);
- }
-
- }
- });
+ });
+ }
+ finally
+ {
+ failoverLatch.countDown();
+ _conn.getProtocolHandler().setFailoverLatch(null);
+ }
if (failoverDone.get())
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ce8996ae/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index cb154ea..f8ec8a8 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -275,7 +275,8 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
// Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
// the fail over.
- setFailoverLatch(new CountDownLatch(1));
+ final CountDownLatch failoverLatch = new CountDownLatch(1);
+ setFailoverLatch(failoverLatch);
try
{
@@ -288,12 +289,8 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
}
finally
{
- CountDownLatch failoverLatch = getFailoverLatch();
- if (failoverLatch != null)
- {
- failoverLatch.countDown();
- setFailoverLatch(null);
- }
+ failoverLatch.countDown();
+ setFailoverLatch(null);
}
}
});
@@ -737,14 +734,12 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
public void blockUntilNotFailingOver() throws InterruptedException
{
- synchronized(_failoverLatchChange)
+ CountDownLatch failoverLatch = getFailoverLatch();
+ if (failoverLatch != null)
{
- if (_failoverLatch != null)
+ if (!failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
{
- if (!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
- {
- _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
- }
+ _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
}
}
}
@@ -762,7 +757,7 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
return queueName.replaceAll("_+", "_");
}
- public CountDownLatch getFailoverLatch()
+ CountDownLatch getFailoverLatch()
{
synchronized (_failoverLatchChange)
{
@@ -770,7 +765,7 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
}
}
- public void setFailoverLatch(CountDownLatch failoverLatch)
+ void setFailoverLatch(CountDownLatch failoverLatch)
{
synchronized (_failoverLatchChange)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org