You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/05/12 10:46:25 UTC

qpid-jms-amqp-0-x git commit: QPID-7774: Improve locking when using failover latch

Repository: qpid-jms-amqp-0-x
Updated Branches:
  refs/heads/master 948c341da -> ce8996aeb


QPID-7774: Improve locking when using failover latch


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/ce8996ae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/ce8996ae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/ce8996ae

Branch: refs/heads/master
Commit: ce8996aebe14f1b6c9d02ec123e8ccda8fd8988b
Parents: 948c341
Author: Alex Rudyy <or...@apache.org>
Authored: Fri May 12 10:17:55 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri May 12 10:17:55 2017 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/client/AMQConnection.java   |  1 -
 .../qpid/client/AMQConnectionDelegate_0_10.java | 76 ++++++++++----------
 .../apache/qpid/client/AMQProtocolHandler.java  | 25 +++----
 3 files changed, 48 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ce8996ae/client/src/main/java/org/apache/qpid/client/AMQConnection.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index 5ffb11e..c629414 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -43,7 +43,6 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ce8996ae/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
index b58ff1e..ba3e4ae 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
@@ -331,53 +331,53 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec
         final ConnectionClose close = exc.getClose();
         if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED)
         {
-            _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1));
-
-            _qpidConnection.notifyFailoverRequired();
-
+            final CountDownLatch failoverLatch = new CountDownLatch(1);
+            _conn.getProtocolHandler().setFailoverLatch(failoverLatch);
             final AtomicBoolean failoverDone = new AtomicBoolean();
-
-            _conn.doWithAllLocks(new Runnable()
+            try
             {
-                @Override
-                public void run()
+                _qpidConnection.notifyFailoverRequired();
+                _conn.doWithAllLocks(new Runnable()
                 {
-                    try
+                    @Override
+                    public void run()
                     {
-                        boolean preFailover = _conn.firePreFailover(false);
-                        if (preFailover)
+                        try
                         {
-                            boolean reconnected;
-                            if(exc instanceof RedirectConnectionException)
+                            boolean preFailover = _conn.firePreFailover(false);
+                            if (preFailover)
                             {
-                                RedirectConnectionException redirect = (RedirectConnectionException)exc;
-                                reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
-                            }
-                            else
-                            {
-                                reconnected = _conn.attemptReconnection();
-                            }
-                            if(reconnected)
-                            {
-                                failoverPrep();
-                                _conn.resubscribeSessions();
-                                _conn.fireFailoverComplete();
-                                failoverDone.set(true);
+                                boolean reconnected;
+                                if (exc instanceof RedirectConnectionException)
+                                {
+                                    RedirectConnectionException redirect = (RedirectConnectionException) exc;
+                                    reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts());
+                                }
+                                else
+                                {
+                                    reconnected = _conn.attemptReconnection();
+                                }
+                                if (reconnected)
+                                {
+                                    failoverPrep();
+                                    _conn.resubscribeSessions();
+                                    _conn.fireFailoverComplete();
+                                    failoverDone.set(true);
+                                }
                             }
                         }
+                        catch (Exception e)
+                        {
+                            _logger.error("error during failover", e);
+                        }
                     }
-                    catch (Exception e)
-                    {
-                        _logger.error("error during failover", e);
-                    }
-                    finally
-                    {
-                        _conn.getProtocolHandler().getFailoverLatch().countDown();
-                        _conn.getProtocolHandler().setFailoverLatch(null);
-                    }
-
-                }
-            });
+                });
+            }
+            finally
+            {
+                failoverLatch.countDown();
+                _conn.getProtocolHandler().setFailoverLatch(null);
+            }
 
 
             if (failoverDone.get())

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/ce8996ae/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index cb154ea..f8ec8a8 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -275,7 +275,8 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
 
                                 // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
                                 // the fail over.
-                                setFailoverLatch(new CountDownLatch(1));
+                                final CountDownLatch failoverLatch = new CountDownLatch(1);
+                                setFailoverLatch(failoverLatch);
 
                                 try
                                 {
@@ -288,12 +289,8 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
                                 }
                                 finally
                                 {
-                                    CountDownLatch failoverLatch = getFailoverLatch();
-                                    if (failoverLatch != null)
-                                    {
-                                        failoverLatch.countDown();
-                                        setFailoverLatch(null);
-                                    }
+                                    failoverLatch.countDown();
+                                    setFailoverLatch(null);
                                 }
                             }
                         });
@@ -737,14 +734,12 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
 
     public void blockUntilNotFailingOver() throws InterruptedException
     {
-        synchronized(_failoverLatchChange)
+        CountDownLatch failoverLatch = getFailoverLatch();
+        if (failoverLatch != null)
         {
-            if (_failoverLatch != null)
+            if (!failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
             {
-                if (!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
-                {
-                    _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
-                }
+                _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME);
             }
         }
     }
@@ -762,7 +757,7 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
         return queueName.replaceAll("_+", "_");
     }
 
-    public CountDownLatch getFailoverLatch()
+    CountDownLatch getFailoverLatch()
     {
         synchronized (_failoverLatchChange)
         {
@@ -770,7 +765,7 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
         }
     }
 
-    public void setFailoverLatch(CountDownLatch failoverLatch)
+    void setFailoverLatch(CountDownLatch failoverLatch)
     {
         synchronized (_failoverLatchChange)
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org