You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2013/06/25 12:20:31 UTC

svn commit: r1496401 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/connection/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ broker/src/main/java/org/apache/qp...

Author: rgodfrey
Date: Tue Jun 25 10:20:31 2013
New Revision: 1496401

URL: http://svn.apache.org/r1496401
Log:
QPID-4946 : [Java Broker] closing the broker may result in same message being delivered to multipl competing consumers

Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Tue Jun 25 10:20:31 2013
@@ -52,6 +52,13 @@ public class ConnectionRegistry implemen
 
     public void close(final String replyText)
     {
+        synchronized(this)
+        {
+            for(AMQConnectionModel conn : _registry)
+            {
+                conn.stop();
+            }
+        }
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Closing connection registry :" + _registry.size() + " connections.");

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Tue Jun 25 10:20:31 2013
@@ -89,4 +89,7 @@ public interface AMQConnectionModel exte
 
     Transport getTransport();
 
+    void stop();
+
+    boolean isStopped();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Tue Jun 25 10:20:31 2013
@@ -190,6 +190,7 @@ public class AMQProtocolEngine implement
     private final Transport _transport;
 
     private volatile boolean _closeWhenNoRoute;
+    private volatile boolean _stopped;
 
     public AMQProtocolEngine(Broker broker,
                              NetworkConnection network,
@@ -1304,6 +1305,18 @@ public class AMQProtocolEngine implement
         return _transport;
     }
 
+    @Override
+    public void stop()
+    {
+        _stopped = true;
+    }
+
+    @Override
+    public boolean isStopped()
+    {
+        return _stopped;
+    }
+
     public long getLastReceivedTime()
     {
         return _lastReceivedTime;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Tue Jun 25 10:20:31 2013
@@ -149,6 +149,8 @@ public class Connection_1_0 implements C
             }
         };
 
+        private volatile boolean _stopped;
+
         @Override
         public void close(AMQConstant cause, String message) throws AMQException
         {
@@ -252,6 +254,18 @@ public class Connection_1_0 implements C
         }
 
         @Override
+        public void stop()
+        {
+            _stopped = true;
+        }
+
+        @Override
+        public boolean isStopped()
+        {
+            return _stopped;
+        }
+
+        @Override
         public void initialiseStatistics()
         {
             _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Tue Jun 25 10:20:31 2013
@@ -163,7 +163,7 @@ class Subscription_1_0 implements Subscr
 
     public boolean isSuspended()
     {
-        return !isActive();// || !getEndpoint().hasCreditToSend();
+        return _link.getSession().getConnectionModel().isStopped() || !isActive();// || !getEndpoint().hasCreditToSend();
 
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Tue Jun 25 10:20:31 2013
@@ -463,7 +463,7 @@ public abstract class SubscriptionImpl i
 
     public boolean isSuspended()
     {
-        return !isActive() || _channel.isSuspended() || _deleted.get();
+        return !isActive() || _channel.isSuspended() || _deleted.get() || _channel.getConnectionModel().isStopped();
     }
 
     /**

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Tue Jun 25 10:20:31 2013
@@ -198,7 +198,7 @@ public class Subscription_0_10 implement
 
     public boolean isSuspended()
     {
-        return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
+        return !isActive() || _deleted.get() || _session.isClosing() || _session.getConnectionModel().isStopped(); // TODO check for Session suspension
     }
 
     public boolean hasInterest(QueueEntry entry)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Tue Jun 25 10:20:31 2013
@@ -74,6 +74,7 @@ public class ServerConnection extends Co
     private Principal _peerPrincipal;
     private NetworkConnection _networkConnection;
     private Transport _transport;
+    private volatile boolean _stopped;
 
     public ServerConnection(final long connectionId)
     {
@@ -169,6 +170,18 @@ public class ServerConnection extends Co
         return _transport;
     }
 
+    @Override
+    public void stop()
+    {
+        _stopped = true;
+    }
+
+    @Override
+    public boolean isStopped()
+    {
+        return _stopped;
+    }
+
     public void setTransport(Transport transport)
     {
         _transport = transport;

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java Tue Jun 25 10:20:31 2013
@@ -553,5 +553,16 @@ public class MockSubscription implements
         {
             return null;
         }
+
+        @Override
+        public void stop()
+        {
+        }
+
+        @Override
+        public boolean isStopped()
+        {
+            return false;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1496401&r1=1496400&r2=1496401&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java Tue Jun 25 10:20:31 2013
@@ -23,6 +23,11 @@ package org.apache.qpid.test.unit.client
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.naming.NamingException;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.AMQConnection;
@@ -45,6 +50,7 @@ public class BrokerClosesClientConnectio
     private Connection _connection;
     private boolean _isExternalBroker;
     private final RecordingExceptionListener _recordingExceptionListener = new RecordingExceptionListener();
+    private Session _session;
 
     @Override
     protected void setUp() throws Exception
@@ -52,7 +58,7 @@ public class BrokerClosesClientConnectio
         super.setUp();
 
         _connection = getConnection();
-        _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         _connection.setExceptionListener(_recordingExceptionListener);
 
         _isExternalBroker = isExternalBroker();
@@ -140,4 +146,70 @@ public class BrokerClosesClientConnectio
             return _exception;
         }
     }
+
+
+    private class Listener implements MessageListener
+    {
+        int _messageCount;
+
+        @Override
+        public synchronized void onMessage(Message message)
+        {
+            _messageCount++;
+        }
+
+        public synchronized int getCount()
+        {
+            return _messageCount;
+        }
+    }
+
+    public void testNoDeliveryAfterBrokerClose() throws JMSException, NamingException, InterruptedException
+    {
+
+        Listener listener = new Listener();
+
+        Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer1 = session.createConsumer(getTestQueue());
+        consumer1.setMessageListener(listener);
+
+        MessageProducer producer = _session.createProducer(getTestQueue());
+        producer.send(_session.createTextMessage("test message"));
+
+        _connection.start();
+
+
+        synchronized (listener)
+        {
+            long currentTime = System.currentTimeMillis();
+            long until = currentTime + 2000l;
+            while(listener.getCount() == 0 && currentTime < until)
+            {
+                listener.wait(until - currentTime);
+                currentTime = System.currentTimeMillis();
+            }
+        }
+        assertEquals(1, listener.getCount());
+
+        Connection connection2 = getConnection();
+        Session session2 = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer2 = session2.createConsumer(getTestQueue());
+        consumer2.setMessageListener(listener);
+        connection2.start();
+
+
+        Connection connection3 = getConnection();
+        Session session3 = connection3.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer3 = session3.createConsumer(getTestQueue());
+        consumer3.setMessageListener(listener);
+        connection3.start();
+
+        assertEquals(1, listener.getCount());
+
+        stopBroker();
+
+        assertEquals(1, listener.getCount());
+
+
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org