You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/25 12:20:31 UTC
svn commit: r1496401 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/connection/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker/src/main/java/org/apache/qp...
Author: rgodfrey
Date: Tue Jun 25 10:20:31 2013
New Revision: 1496401
URL: http://svn.apache.org/r1496401
Log:
QPID-4946 : [Java Broker] closing the broker may result in same message being delivered to multipl competing consumers
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Tue Jun 25 10:20:31 2013
@@ -52,6 +52,13 @@ public class ConnectionRegistry implemen
public void close(final String replyText)
{
+ synchronized(this)
+ {
+ for(AMQConnectionModel conn : _registry)
+ {
+ conn.stop();
+ }
+ }
if (_logger.isDebugEnabled())
{
_logger.debug("Closing connection registry :" + _registry.size() + " connections.");
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Tue Jun 25 10:20:31 2013
@@ -89,4 +89,7 @@ public interface AMQConnectionModel exte
Transport getTransport();
+ void stop();
+
+ boolean isStopped();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Jun 25 10:20:31 2013
@@ -190,6 +190,7 @@ public class AMQProtocolEngine implement
private final Transport _transport;
private volatile boolean _closeWhenNoRoute;
+ private volatile boolean _stopped;
public AMQProtocolEngine(Broker broker,
NetworkConnection network,
@@ -1304,6 +1305,18 @@ public class AMQProtocolEngine implement
return _transport;
}
+ @Override
+ public void stop()
+ {
+ _stopped = true;
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
public long getLastReceivedTime()
{
return _lastReceivedTime;
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Jun 25 10:20:31 2013
@@ -149,6 +149,8 @@ public class Connection_1_0 implements C
}
};
+ private volatile boolean _stopped;
+
@Override
public void close(AMQConstant cause, String message) throws AMQException
{
@@ -252,6 +254,18 @@ public class Connection_1_0 implements C
}
@Override
+ public void stop()
+ {
+ _stopped = true;
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
+ @Override
public void initialiseStatistics()
{
_messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Tue Jun 25 10:20:31 2013
@@ -163,7 +163,7 @@ class Subscription_1_0 implements Subscr
public boolean isSuspended()
{
- return !isActive();// || !getEndpoint().hasCreditToSend();
+ return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Tue Jun 25 10:20:31 2013
@@ -463,7 +463,7 @@ public abstract class SubscriptionImpl i
public boolean isSuspended()
{
- return !isActive() || _channel.isSuspended() || _deleted.get();
+ return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
}
/**
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Tue Jun 25 10:20:31 2013
@@ -198,7 +198,7 @@ public class Subscription_0_10 implement
public boolean isSuspended()
{
- return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
+ return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
}
public boolean hasInterest(QueueEntry entry)
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Jun 25 10:20:31 2013
@@ -74,6 +74,7 @@ public class ServerConnection extends Co
private Principal _peerPrincipal;
private NetworkConnection _networkConnection;
private Transport _transport;
+ private volatile boolean _stopped;
public ServerConnection(final long connectionId)
{
@@ -169,6 +170,18 @@ public class ServerConnection extends Co
return _transport;
}
+ @Override
+ public void stop()
+ {
+ _stopped = true;
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
public void setTransport(Transport transport)
{
_transport = transport;
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Jun 25 10:20:31 2013
@@ -553,5 +553,16 @@ public class MockSubscription implements
{
return null;
}
+
+ @Override
+ public void stop()
+ {
+ }
+
+ @Override
+ public boolean isStopped()
+ {
+ return false;
+ }
}
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java Tue Jun 25 10:20:31 2013
@@ -23,6 +23,11 @@ package org.apache.qpid.test.unit.client
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.naming.NamingException;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.AMQConnection;
@@ -45,6 +50,7 @@ public class BrokerClosesClientConnectio
private Connection _connection;
private boolean _isExternalBroker;
private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener();
+ private Session _session;
@Override
protected void setUp() throws Exception
@@ -52,7 +58,7 @@ public class BrokerClosesClientConnectio
super.setUp();
_connection = getConnection();
- _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_connection.setExceptionListener(_recordingExceptionListener);
_isExternalBroker = isExternalBroker();
@@ -140,4 +146,70 @@ public class BrokerClosesClientConnectio
return _exception;
}
}
+
+
+ private class Listener implements MessageListener
+ {
+ int _messageCount;
+
+ @Override
+ public synchronized void onMessage(Message message)
+ {
+ _messageCount++;
+ }
+
+ public synchronized int getCount()
+ {
+ return _messageCount;
+ }
+ }
+
+ public void testNoDeliveryAfterBrokerClose() throws JMSException, NamingException, InterruptedException
+ {
+
+ Listener listener = new Listener();
+
+ Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer1 = session.createConsumer(getTestQueue());
+ consumer1.setMessageListener(listener);
+
+ MessageProducer producer = _session.createProducer(getTestQueue());
+ producer.send(_session.createTextMessage("test message"));
+
+ _connection.start();
+
+
+ synchronized (listener)
+ {
+ long currentTime = System.currentTimeMillis();
+ long until = currentTime + 2000l;
+ while(listener.getCount() == 0 && currentTime < until)
+ {
+ listener.wait(until - currentTime);
+ currentTime = System.currentTimeMillis();
+ }
+ }
+ assertEquals(1, listener.getCount());
+
+ Connection connection2 = getConnection();
+ Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer2 = session2.createConsumer(getTestQueue());
+ consumer2.setMessageListener(listener);
+ connection2.start();
+
+
+ Connection connection3 = getConnection();
+ Session session3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer3 = session3.createConsumer(getTestQueue());
+ consumer3.setMessageListener(listener);
+ connection3.start();
+
+ assertEquals(1, listener.getCount());
+
+ stopBroker();
+
+ assertEquals(1, listener.getCount());
+
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org