You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2015/08/06 16:26:52 UTC
svn commit: r1694509 - in /qpid/java/trunk:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/failover/
client/src/main/java/org/apache/qpid/client/protocol/
client/src/test/java/org/apache/qpid/client/protocol/ s...
Author: orudyy
Date: Thu Aug 6 14:26:51 2015
New Revision: 1694509
URL: http://svn.apache.org/r1694509
Log:
QPID-3521: Restore AMQConnection#doWithAllLocks(Runnable) and move part of failover specific functionality from FailoverHandler into AMQProtocolHandler
Reduce coupling between AMQProtocolHandler and AMQProtocolSession by swapping some methods
Remove unneeded ConnectionHelper
Removed:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/ConnectionHelper.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java Thu Aug 6 14:26:51 2015
@@ -1818,6 +1818,47 @@ public class AMQConnection extends Close
return _messageCompressionThresholdSize;
}
+ public void doWithAllLocks(Runnable r)
+ {
+ doWithAllLocks(r, _sessions.values());
+
+ }
+
+ private void doWithAllLocks(final Runnable r, final List<AMQSession> sessions)
+ {
+ if (!sessions.isEmpty())
+ {
+ AMQSession session = sessions.remove(0);
+
+ final Object dispatcherLock = session.getDispatcherLock();
+ if (dispatcherLock != null)
+ {
+ synchronized (dispatcherLock)
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions);
+ }
+ }
+ }
+ else
+ {
+ synchronized (session.getMessageDeliveryLock())
+ {
+ doWithAllLocks(r, sessions);
+ }
+ }
+ }
+ else
+ {
+ synchronized (getFailoverMutex())
+ {
+ r.run();
+ }
+ }
+ }
+
+
public String getTemporaryQueuePrefix()
{
if(_delegate.isVirtualHostPropertiesSupported())
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Thu Aug 6 14:26:51 2015
@@ -313,7 +313,7 @@ public class AMQConnectionDelegate_0_10
final AtomicBoolean failoverDone = new AtomicBoolean();
- ConnectionHelper.doWithAllConnectionAndSessionLocks(_conn, new Runnable()
+ _conn.doWithAllLocks(new Runnable()
{
@Override
public void run()
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Aug 6 14:26:51 2015
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.client.failover;
-import java.util.concurrent.CountDownLatch;
-
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.ConnectionHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,40 +35,7 @@ import org.apache.qpid.client.state.AMQS
* connections, failing over to a new connection if the transport connection fails. The procedure to establish a new
* connection is expressed as a continuation, in order that it may be run in a seperate thread to the i/o thread that
* detected the failure and is used to handle the communication to establish a new connection.
- * <p>
- * The reason this needs to be a separate thread is because this work cannot be done inside the i/o processor
- * thread. The significant task is the connection setup which involves a protocol exchange until a particular state
- * is achieved. This procedure waits until the state is achieved which would prevent the i/o thread doing the work
- * it needs to do to achieve the new state.
- * <p>
- * The failover procedure does the following:
- *
- * <ol>
- * <li>Sets the failing over condition to true.</li>
- * <li>Creates a {@link FailoverException} and gets the protocol connection handler to propagate this event to all
- * interested parties.</li>
- * <li>Takes the failover mutex on the protocol connection handler.</li>
- * <li>Abandons the fail over if any of the interested parties vetoes it. The mutex is released and the condition
- * reset.</li>
- * <li>Creates a new {@link AMQStateManager} and re-established the connection through it.</li>
- * <li>Informs the AMQConnection if the connection cannot be re-established.</li>
- * <li>Recreates all sessions from the old connection to the new.</li>
- * <li>Resets the failing over condition and releases the mutex.</li>
- * </ol>
*
- * <p>
- * TODO The failover latch and mutex are used like a lock and condition. If the retrotranlator supports lock/condition
- * then could change over to using them. 1.4 support still needed.
- * <p>
- * TODO If the condition is set to null on a vetoes fail-over and there are already other threads waiting on the
- * condition, they will never be released. It might be an idea to reset the condition in a finally block.
- * <p>
- * TODO Creates a {@link AMQDisconnectedException} and passes it to the AMQConnection. No need to use an
- * exception-as-argument here, could just as easily call a specific method for this purpose on AMQConnection.
- * <p>
- * TODO Creates a {@link FailoverException} and propagates it to the MethodHandlers. No need to use an
- * exception-as-argument here, could just as easily call a specific method for this purpose on
- * {@link org.apache.qpid.protocol.AMQMethodListener}.
*/
public class FailoverHandler implements Runnable
{
@@ -103,36 +67,6 @@ public class FailoverHandler implements
*/
public void run()
{
- if (Thread.currentThread().isDaemon())
- {
- throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
- }
-
- // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
- // the fail over.
- _amqProtocolHandler.setFailoverLatch(new CountDownLatch(1));
-
- // We wake up listeners. If they can handle failover, they will extend the
- // FailoverRetrySupport class and will in turn block on the latch until failover
- // has completed before retrying the operation.
- _amqProtocolHandler.notifyFailoverStarting();
-
- final AMQConnection connection = _amqProtocolHandler.getConnection();
-
- ConnectionHelper.doWithAllConnectionAndSessionLocks(connection, new Runnable()
- {
- @Override
- public void run()
- {
- performFailover();
- }
- });
-
- _amqProtocolHandler.getFailoverLatch().countDown();
- }
-
- private void performFailover()
- {
AMQConnection connection = _amqProtocolHandler.getConnection();
// brace to keep indentation
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Aug 6 14:26:51 2015
@@ -146,6 +146,9 @@ public class AMQProtocolHandler implemen
private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+ private int _queueId = 1;
+ private final Object _queueIdLock = new Object();
+
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -263,7 +266,32 @@ public class AMQProtocolHandler implemen
final Thread failoverThread;
try
{
- failoverThread = Threading.getThreadFactory().createThread(_failoverHandler);
+ failoverThread = Threading.getThreadFactory().createThread(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ if (Thread.currentThread().isDaemon())
+ {
+ throw new IllegalStateException("FailoverHandler must run on a non-daemon thread.");
+ }
+
+ // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of
+ // the fail over.
+ setFailoverLatch(new CountDownLatch(1));
+
+ // We wake up listeners. If they can handle failover, they will extend the
+ // FailoverRetrySupport class and will in turn block on the latch until failover
+ // has completed before retrying the operation.
+ notifyFailoverStarting();
+
+ getConnection().doWithAllLocks(_failoverHandler);
+
+ getFailoverLatch().countDown();
+ }
+ });
}
catch (Exception e)
{
@@ -753,7 +781,15 @@ public class AMQProtocolHandler implemen
public String generateQueueName()
{
- return _protocolSession.generateQueueName();
+ int id;
+ synchronized (_queueIdLock)
+ {
+ id = _queueId++;
+ }
+ // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
+ String localAddress = getLocalAddress().toString().replaceAll("[./:;]", "_");
+ String queueName = "tmp_" + localAddress + "_" + id;
+ return queueName.replaceAll("_+", "_");
}
public CountDownLatch getFailoverLatch()
@@ -829,6 +865,7 @@ public class AMQProtocolHandler implemen
{
_network = network;
_sender = sender;
+ _protocolSession.setSender(sender);
}
@Override
@@ -848,16 +885,6 @@ public class AMQProtocolHandler implemen
return _sender;
}
- void initHeartbeats(int delay, float timeoutFactor)
- {
- if (delay > 0)
- {
- _network.setMaxWriteIdle(delay);
- int readerIdle = (int)(delay * timeoutFactor);
- _network.setMaxReadIdle(readerIdle);
- }
- }
-
public NetworkConnection getNetworkConnection()
{
return _network;
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Aug 6 14:26:51 2015
@@ -55,6 +55,7 @@ import org.apache.qpid.protocol.AMQVersi
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* Wrapper for protocol session that provides type-safe access to session attributes.
@@ -81,9 +82,6 @@ public class AMQProtocolSession implemen
private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<>();
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
- private int _queueId = 1;
- private final Object _queueIdLock = new Object();
-
private ProtocolVersion _protocolVersion;
private final MethodRegistry _methodRegistry =
@@ -102,6 +100,7 @@ public class AMQProtocolSession implemen
private SaslClient _saslClient;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
+ private volatile ByteBufferSender _sender;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
{
@@ -109,7 +108,7 @@ public class AMQProtocolSession implemen
_protocolVersion = connection.getProtocolVersion();
if (_logger.isDebugEnabled())
{
- _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
+ _logger.debug("Using ProtocolVersion for Session:" + _protocolVersion);
}
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
this);
@@ -145,7 +144,19 @@ public class AMQProtocolSession implemen
con.setMaximumChannelCount(params.getChannelMax());
con.setMaximumFrameSize(params.getFrameMax());
- _protocolHandler.initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor());
+
+ initHeartbeats(params.getHeartbeat(), params.getHeartbeatTimeoutFactor());
+ }
+
+ void initHeartbeats(int delay, float timeoutFactor)
+ {
+ if (delay > 0)
+ {
+ NetworkConnection network = getProtocolHandler().getNetworkConnection();
+ network.setMaxWriteIdle(delay);
+ int readerIdle = (int)(delay * timeoutFactor);
+ network.setMaxReadIdle(readerIdle);
+ }
}
public String getClientID()
@@ -313,7 +324,7 @@ public class AMQProtocolSession implemen
{
if (_logger.isDebugEnabled())
{
- _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
+ _logger.debug("closeSession called on protocol session for session " + session.getChannelId());
}
final int channelId = session.getChannelId();
if (channelId <= 0)
@@ -380,20 +391,7 @@ public class AMQProtocolSession implemen
public ByteBufferSender getSender()
{
- return _protocolHandler.getSender();
- }
-
- protected String generateQueueName()
- {
- int id;
- synchronized (_queueIdLock)
- {
- id = _queueId++;
- }
- // convert '.', '/', ':' and ';' to single '_', for spec compliance and readability
- String localAddress = _protocolHandler.getLocalAddress().toString().replaceAll("[./:;]", "_");
- String queueName = "tmp_" + localAddress + "_" + id;
- return queueName.replaceAll("_+", "_");
+ return _sender;
}
public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
@@ -454,7 +452,7 @@ public class AMQProtocolSession implemen
public void setSender(ByteBufferSender sender)
{
- // No-op, interface munging
+ _sender = sender;
}
Modified: qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1694509&r1=1694508&r2=1694509&view=diff
==============================================================================
--- qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/java/trunk/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Aug 6 14:26:51 2015
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.client.protocol;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -129,6 +133,40 @@ public class AMQProtocolHandlerTest exte
AMQConstant.INTERNAL_ERROR, ((AMQException)_listener.getReceivedException()).getErrorCode());
}
+
+ public void testTemporaryQueueWildcard() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(1234), "tmp_0_0_0_0_0_0_0_0_1234_1");
+ }
+
+ public void testTemporaryQueueLocalhostAddr() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 1234), "tmp_127_0_0_1_1234_1");
+ }
+
+ public void testTemporaryQueueLocalhostName() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("localhost"), 1234), "tmp_localhost_127_0_0_1_1234_1");
+ }
+
+ public void testTemporaryQueueInet4() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("192.168.1.2"), 1234), "tmp_192_168_1_2_1234_1");
+ }
+
+ public void testTemporaryQueueInet6() throws UnknownHostException
+ {
+ checkTempQueueName(new InetSocketAddress(InetAddress.getByName("1080:0:0:0:8:800:200C:417A"), 1234), "tmp_1080_0_0_0_8_800_200c_417a_1234_1");
+ }
+
+ private void checkTempQueueName(SocketAddress address, String queueName)
+ {
+ TestNetworkConnection networkConnection = new TestNetworkConnection();
+ networkConnection.setLocalAddress(address);
+ _handler.setNetworkConnection(networkConnection);
+ assertEquals("Wrong queue name", queueName, _handler.generateQueueName());
+ }
+
/**
* This is the main test method for both test cases.
*
@@ -288,5 +326,4 @@ public class AMQProtocolHandlerTest exte
return _receivedException;
}
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org