You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/06 16:26:52 UTC

svn commit: r1694509 - in /qpid/java/trunk: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/test/java/org/apache/qpid/client/protocol/ s...

Author: orudyy
Date: Thu Aug  6 14:26:51 2015
New Revision: 1694509

URL: http://svn.apache.org/r1694509
Log:
QPID-3521: Restore AMQConnection#doWithAllLocks(Runnable) and move part of failover specific functionality from FailoverHandler into AMQProtocolHandler
           Reduce coupling between AMQProtocolHandler and AMQProtocolSession by swapping some methods
           Remove unneeded ConnectionHelper

Removed:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Aug  6 14:26:51 2015
@@ -1818,6 +1818,47 @@ public class AMQConnection extends Close
         return _messageCompressionThresholdSize;
     }
 
+    public void doWithAllLocks(Runnable r)
+    {
+        doWithAllLocks(r, _sessions.values());
+
+    }
+
+    private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
+    {
+        if (!sessions.isEmpty())
+        {
+            AMQSession session = sessions.remove(0);
+
+            final Object dispatcherLock = session.getDispatcherLock();
+            if (dispatcherLock != null)
+            {
+                synchronized (dispatcherLock)
+                {
+                    synchronized (session.getMessageDeliveryLock())
+                    {
+                        doWithAllLocks(r, sessions);
+                    }
+                }
+            }
+            else
+            {
+                synchronized (session.getMessageDeliveryLock())
+                {
+                    doWithAllLocks(r, sessions);
+                }
+            }
+        }
+        else
+        {
+            synchronized (getFailoverMutex())
+            {
+                r.run();
+            }
+        }
+    }
+
+
     public String getTemporaryQueuePrefix()
     {
         if(_delegate.isVirtualHostPropertiesSupported())

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Aug  6 14:26:51 2015
@@ -313,7 +313,7 @@ public class AMQConnectionDelegate_0_10
 
             final AtomicBoolean failoverDone = new AtomicBoolean();
 
-            ConnectionHelper.doWithAllConnectionAndSessionLocks(_conn, new Runnable()
+            _conn.doWithAllLocks(new Runnable()
             {
                 @Override
                 public void run()

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Aug  6 14:26:51 2015
@@ -20,10 +20,7 @@
  */
 package org.apache.qpid.client.failover;
 
-import java.util.concurrent.CountDownLatch;
-
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.ConnectionHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,40 +35,7 @@ import org.apache.qpid.client.state.AMQS
  * connections, failing over to a new connection if the transport connection fails. The procedure to establish a new
  * connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that
  * detected the failure and is used to handle the communication to establish a new connection.
- * <p>
- * The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor
- * thread. The significant task is the connection setup which involves a protocol exchange until a particular state
- * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work
- * it needs to do to achieve the new state.
- * <p>
- * The failover procedure does the following:
- *
- * <ol>
- * <li>Sets the failing over condition to true.</li>
- * <li>Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all
- *     interested parties.</li>
- * <li>Takes the failover mutex on the protocol connection handler.</li>
- * <li>Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition
- *     reset.</li>
- * <li>Creates a new {@link AMQStateManager} and re-established the connection through it.</li>
- * <li>Informs the AMQConnection if the connection cannot be re-established.</li>
- * <li>Recreates all sessions from the old connection to the new.</li>
- * <li>Resets the failing over condition and releases the mutex.</li>
- * </ol>
  *
- * <p>
- * TODO  The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition
- *       then could change over to using them. 1.4 support still needed.
- * <p>
- * TODO  If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the
- *       condition, they will never be released. It might be an idea to reset the condition in a finally block.
- * <p>
- * TODO  Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an
- *       exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection.
- * <p>
- * TODO  Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an
- *       exception-as-argument here, could just as easily call a specific method for this purpose on
- *       {@link org.apache.qpid.protocol.AMQMethodListener}.
  */
 public class FailoverHandler implements Runnable
 {
@@ -103,36 +67,6 @@ public class FailoverHandler implements
      */
     public void run()
     {
-        if (Thread.currentThread().isDaemon())
-        {
-            throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
-        }
-
-        // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
-        // the fail over.
-        _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1));
-
-        // We wake up listeners. If they can handle failover, they will extend the
-        // FailoverRetrySupport class and will in turn block on the latch until failover
-        // has completed before retrying the operation.
-        _amqProtocolHandler.notifyFailoverStarting();
-
-        final AMQConnection connection = _amqProtocolHandler.getConnection();
-
-        ConnectionHelper.doWithAllConnectionAndSessionLocks(connection, new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                performFailover();
-            }
-        });
-
-        _amqProtocolHandler.getFailoverLatch().countDown();
-    }
-
-    private void performFailover()
-    {
         AMQConnection connection = _amqProtocolHandler.getConnection();
 
         // brace to keep indentation

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Aug  6 14:26:51 2015
@@ -146,6 +146,9 @@ public class AMQProtocolHandler implemen
     private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
     private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
 
+    private int _queueId = 1;
+    private final Object _queueIdLock = new Object();
+
     /**
      * Creates a new protocol handler, associated with the specified client connection instance.
      *
@@ -263,7 +266,32 @@ public class AMQProtocolHandler implemen
             final Thread failoverThread;
             try
             {
-                failoverThread = Threading.getThreadFactory().createThread(_failoverHandler);
+                failoverThread = Threading.getThreadFactory().createThread(
+                        new Runnable()
+                        {
+                            @Override
+                            public void run()
+                            {
+
+                                if (Thread.currentThread().isDaemon())
+                                {
+                                    throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
+                                }
+
+                                // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
+                                // the fail over.
+                                setFailoverLatch(new CountDownLatch(1));
+
+                                // We wake up listeners. If they can handle failover, they will extend the
+                                // FailoverRetrySupport class and will in turn block on the latch until failover
+                                // has completed before retrying the operation.
+                                notifyFailoverStarting();
+
+                                getConnection().doWithAllLocks(_failoverHandler);
+
+                                getFailoverLatch().countDown();
+                            }
+                        });
             }
             catch (Exception e)
             {
@@ -753,7 +781,15 @@ public class AMQProtocolHandler implemen
 
     public String generateQueueName()
     {
-        return _protocolSession.generateQueueName();
+        int id;
+        synchronized (_queueIdLock)
+        {
+            id = _queueId++;
+        }
+        // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
+        String localAddress = getLocalAddress().toString().replaceAll("[./:;]", "_");
+        String queueName = "tmp_" + localAddress + "_" + id;
+        return queueName.replaceAll("_+", "_");
     }
 
     public CountDownLatch getFailoverLatch()
@@ -829,6 +865,7 @@ public class AMQProtocolHandler implemen
     {
         _network = network;
         _sender = sender;
+        _protocolSession.setSender(sender);
     }
 
     @Override
@@ -848,16 +885,6 @@ public class AMQProtocolHandler implemen
         return _sender;
     }
 
-    void initHeartbeats(int delay, float timeoutFactor)
-    {
-        if (delay > 0)
-        {
-            _network.setMaxWriteIdle(delay);
-            int readerIdle = (int)(delay * timeoutFactor);
-            _network.setMaxReadIdle(readerIdle);
-        }
-    }
-
     public NetworkConnection getNetworkConnection()
     {
         return _network;

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Aug  6 14:26:51 2015
@@ -55,6 +55,7 @@ import org.apache.qpid.protocol.AMQVersi
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionSettings;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkConnection;
 
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes.
@@ -81,9 +82,6 @@ public class AMQProtocolSession implemen
     private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<>();
     private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
 
-    private int _queueId = 1;
-    private final Object _queueIdLock = new Object();
-
     private ProtocolVersion _protocolVersion;
 
     private final MethodRegistry _methodRegistry =
@@ -102,6 +100,7 @@ public class AMQProtocolSession implemen
     private SaslClient _saslClient;
 
     private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+    private volatile ByteBufferSender _sender;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
     {
@@ -109,7 +108,7 @@ public class AMQProtocolSession implemen
         _protocolVersion = connection.getProtocolVersion();
         if (_logger.isDebugEnabled())
         {
-        	_logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
+            _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
         }
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
                                                                            this);
@@ -145,7 +144,19 @@ public class AMQProtocolSession implemen
         con.setMaximumChannelCount(params.getChannelMax());
         con.setMaximumFrameSize(params.getFrameMax());
 
-        _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor());
+
+        initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor());
+    }
+
+    void initHeartbeats(int delay, float timeoutFactor)
+    {
+        if (delay > 0)
+        {
+            NetworkConnection network = getProtocolHandler().getNetworkConnection();
+            network.setMaxWriteIdle(delay);
+            int readerIdle = (int)(delay * timeoutFactor);
+            network.setMaxReadIdle(readerIdle);
+        }
     }
 
     public String getClientID()
@@ -313,7 +324,7 @@ public class AMQProtocolSession implemen
     {
         if (_logger.isDebugEnabled())
         {
-        	_logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+            _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
         }
         final int channelId = session.getChannelId();
         if (channelId <= 0)
@@ -380,20 +391,7 @@ public class AMQProtocolSession implemen
 
     public ByteBufferSender getSender()
     {
-        return _protocolHandler.getSender();
-    }
-
-    protected String generateQueueName()
-    {
-        int id;
-        synchronized (_queueIdLock)
-        {
-            id = _queueId++;
-        }
-        // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
-        String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
-        String queueName = "tmp_" + localAddress + "_" + id;
-        return queueName.replaceAll("_+", "_");
+        return _sender;
     }
 
     public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
@@ -454,7 +452,7 @@ public class AMQProtocolSession implemen
 
     public void setSender(ByteBufferSender sender)
     {
-        // No-op, interface munging
+        _sender = sender;
     }
 
 

Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Aug  6 14:26:51 2015
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -129,6 +133,40 @@ public class AMQProtocolHandlerTest exte
                      AMQConstant.INTERNAL_ERROR,  ((AMQException)_listener.getReceivedException()).getErrorCode());
     }
 
+
+    public void testTemporaryQueueWildcard() throws UnknownHostException
+    {
+        checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1");
+    }
+
+    public void testTemporaryQueueLocalhostAddr() throws UnknownHostException
+    {
+        checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1");
+    }
+
+    public void testTemporaryQueueLocalhostName() throws UnknownHostException
+    {
+        checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1");
+    }
+
+    public void testTemporaryQueueInet4() throws UnknownHostException
+    {
+        checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1");
+    }
+
+    public void testTemporaryQueueInet6() throws UnknownHostException
+    {
+        checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
+    }
+
+    private void checkTempQueueName(SocketAddress address, String queueName)
+    {
+        TestNetworkConnection networkConnection = new TestNetworkConnection();
+        networkConnection.setLocalAddress(address);
+        _handler.setNetworkConnection(networkConnection);
+        assertEquals("Wrong queue name", queueName, _handler.generateQueueName());
+    }
+
     /**
      * This is the main test method for both test cases.
      *
@@ -288,5 +326,4 @@ public class AMQProtocolHandlerTest exte
             return _receivedException;
         }
     }
-
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org