You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/07/24 19:39:15 UTC

svn commit: r1613197 - in /qpid/trunk/qpid/java: broker-core/src/main/java/org/apache/qpid/server/model/adapter/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-core/src/test/java/org/apache/qpid/server/model/ systests/src/main/java/org/...

Author: kwall
Date: Thu Jul 24 17:39:15 2014
New Revision: 1613197

URL: http://svn.apache.org/r1613197
Log:
QPID-5915: [Java Broker] Ensure that closing a Connection model object also cause the underlying connection to close too

Modified:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Thu Jul 24 17:39:15 2014
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -43,24 +44,29 @@ import org.apache.qpid.server.util.Actio
 public final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>,
                                                                                              SessionModelListener
 {
-    private AMQConnectionModel _connection;
+    private final Action _underlyingConnectionDeleteTask;
+    private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
+    private AMQConnectionModel _underlyingConnection;
 
     private State _state = State.ACTIVE;
 
     public ConnectionAdapter(final AMQConnectionModel conn)
     {
         super(parentsMap(conn.getVirtualHost()),createAttributes(conn));
-        _connection = conn;
+        _underlyingConnection = conn;
 
-        conn.addDeleteTask(new Action()
+        // Used to allow the protocol layers to tell the model they have been deleted
+        _underlyingConnectionDeleteTask = new Action()
         {
             @Override
             public void performAction(final Object object)
             {
                 conn.removeDeleteTask(this);
+                _underlyingClosed.set(true);
                 deleted();
             }
-        });
+        };
+        conn.addDeleteTask(_underlyingConnectionDeleteTask);
 
         conn.addSessionListener(this);
     }
@@ -77,13 +83,13 @@ public final class ConnectionAdapter ext
     @Override
     public String getClientId()
     {
-        return _connection.getClientId();
+        return _underlyingConnection.getClientId();
     }
 
     @Override
     public String getClientVersion()
     {
-        return _connection.getClientVersion();
+        return _underlyingConnection.getClientVersion();
     }
 
     @Override
@@ -101,14 +107,14 @@ public final class ConnectionAdapter ext
     @Override
     public String getPrincipal()
     {
-        final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal();
+        final Principal authorizedPrincipal = _underlyingConnection.getAuthorizedPrincipal();
         return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
     }
 
     @Override
     public String getRemoteAddress()
     {
-        return _connection.getRemoteAddressString();
+        return _underlyingConnection.getRemoteAddressString();
     }
 
     @Override
@@ -126,19 +132,19 @@ public final class ConnectionAdapter ext
     @Override
     public long getSessionCountLimit()
     {
-        return _connection.getSessionCountLimit();
+        return _underlyingConnection.getSessionCountLimit();
     }
 
     @Override
     public Transport getTransport()
     {
-        return _connection.getTransport();
+        return _underlyingConnection.getTransport();
     }
 
     @Override
     public Port getPort()
     {
-        return _connection.getPort();
+        return _underlyingConnection.getPort();
     }
 
     public Collection<Session> getSessions()
@@ -149,17 +155,22 @@ public final class ConnectionAdapter ext
     @StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
     private void doDelete()
     {
-        _connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+        closeUnderlyingConnection();
         deleted();
         _state = State.DELETED;
     }
 
+    @Override
+    protected void onClose()
+    {
+        closeUnderlyingConnection();
+    }
+
     public State getState()
     {
         return _state;
     }
 
-
     @Override
     public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
     {
@@ -177,37 +188,37 @@ public final class ConnectionAdapter ext
     @Override
     public long getBytesIn()
     {
-        return _connection.getDataReceiptStatistics().getTotal();
+        return _underlyingConnection.getDataReceiptStatistics().getTotal();
     }
 
     @Override
     public long getBytesOut()
     {
-        return _connection.getDataDeliveryStatistics().getTotal();
+        return _underlyingConnection.getDataDeliveryStatistics().getTotal();
     }
 
     @Override
     public long getMessagesIn()
     {
-        return _connection.getMessageReceiptStatistics().getTotal();
+        return _underlyingConnection.getMessageReceiptStatistics().getTotal();
     }
 
     @Override
     public long getMessagesOut()
     {
-        return _connection.getMessageDeliveryStatistics().getTotal();
+        return _underlyingConnection.getMessageDeliveryStatistics().getTotal();
     }
 
     @Override
     public long getLastIoTime()
     {
-        return _connection.getLastIoTime();
+        return _underlyingConnection.getLastIoTime();
     }
 
     @Override
     public int getSessionCount()
     {
-        return _connection.getSessionModels().size();
+        return _underlyingConnection.getSessionModels().size();
     }
 
     @Override
@@ -223,4 +234,14 @@ public final class ConnectionAdapter ext
     {
         // SessionAdapter installs delete task to cause session model object to delete
     }
+
+    private void closeUnderlyingConnection()
+    {
+        if (_underlyingClosed.compareAndSet(false, true))
+        {
+            _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
+            _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+        }
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Jul 24 17:39:15 2014
@@ -1727,13 +1727,13 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private QueueRunner _queueRunner = new QueueRunner(this);
+    private final QueueRunner _queueRunner = new QueueRunner(this);
 
     public void deliverAsync()
     {
         _stateChangeCount.incrementAndGet();
 
-        _queueRunner.execute(_asyncDelivery);
+        _queueRunner.execute();
 
     }
 
@@ -1746,7 +1746,7 @@ public abstract class AbstractQueue<X ex
         else
         {
             SubFlushRunner flusher = sub.getRunner();
-            flusher.execute(_asyncDelivery);
+            flusher.execute();
         }
 
     }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Thu Jul 24 17:39:15 2014
@@ -114,12 +114,12 @@ public class QueueRunner implements Runn
         return "QueueRunner-" + _queue.getLogSubject();
     }
 
-    public void execute(Executor executor)
+    public void execute()
     {
         _stateChange.set(true);
         if(_scheduled.compareAndSet(IDLE, SCHEDULED))
         {
-            executor.execute(this);
+            _queue.execute(this);
         }
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Thu Jul 24 17:39:15 2014
@@ -112,12 +112,12 @@ class SubFlushRunner implements Runnable
         return "SubFlushRunner-" + _sub.toLogString();
     }
 
-    public void execute(Executor executor)
+    public void execute()
     {
         _stateChange.set(true);
         if(_scheduled.compareAndSet(IDLE,SCHEDULED))
         {
-            executor.execute(this);
+            getQueue().execute(this);
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Thu Jul 24 17:39:15 2014
@@ -31,14 +31,19 @@ import static org.mockito.Mockito.verify
 import static org.mockito.Mockito.when;
 
 import java.security.AccessControlException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
 import org.mockito.ArgumentMatcher;
 
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.security.access.Operation;
 import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -152,6 +157,57 @@ public class VirtualHostTest extends Qpi
         verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
     }
 
+    public void testStopVirtualHost_ClosesConnections()
+    {
+        String virtualHostName = getName();
+
+        VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
+        assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+        AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+
+        assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size());
+
+        ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+
+        assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren(
+                Connection.class).size());
+
+        virtualHost.stop();
+        assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
+
+        assertEquals("Unexpected number of connections after virtualhost stopped",
+                     0,
+                     virtualHost.getChildren(Connection.class).size());
+
+        verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+    }
+
+    public void testDeleteVirtualHost_ClosesConnections()
+    {
+        String virtualHostName = getName();
+
+        VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
+        assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+        AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+
+        assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size());
+
+        ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+
+        assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren(Connection.class).size());
+
+        virtualHost.delete();
+        assertEquals("Unexpected state", State.DELETED, virtualHost.getState());
+
+        assertEquals("Unexpected number of connections after virtualhost deleted",
+                     0,
+                     virtualHost.getChildren(Connection.class).size());
+
+        verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+    }
+
     public void testCreateDurableQueue()
     {
         String virtualHostName = getName();
@@ -273,6 +329,14 @@ public class VirtualHostTest extends Qpi
         return host;
     }
 
+    private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
+    {
+        final AMQConnectionModel connection = mock(AMQConnectionModel.class);
+        when(connection.getVirtualHost()).thenReturn(virtualHost);
+        when(connection.getRemoteAddressString()).thenReturn("peer:1234");
+        return connection;
+    }
+
     private static ConfiguredObjectRecord matchesRecord(UUID id, String type)
     {
         return argThat(new MinimalConfiguredObjectRecordMatcher(id, type));

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java Thu Jul 24 17:39:15 2014
@@ -20,10 +20,10 @@
  */
 package org.apache.qpid.test.unit.client.channelclose;
 
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
+import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.MessageConsumer;
 import javax.jms.Session;
 
@@ -34,22 +34,12 @@ public class CloseWithBlockingReceiveTes
 {
 
 
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception
-    {
-        super.tearDown();
-    }
-
-
     public void testReceiveReturnsNull() throws Exception
     {
-        final AMQConnection connection =  (AMQConnection) getConnection("guest", "guest");
+        final Connection connection =  getConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana"));
+        Destination destination = session.createQueue(getTestQueueName());
+        MessageConsumer consumer = session.createConsumer(destination);
         connection.start();
 
         Runnable r = new Runnable()
@@ -68,14 +58,16 @@ public class CloseWithBlockingReceiveTes
             }
         };
         long startTime = System.currentTimeMillis();
-        new Thread(r).start();
-        consumer.receive(10000);
-        assertTrue(System.currentTimeMillis() - startTime < 10000);
-    }
-
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+        Thread thread = new Thread(r);
+        thread.start();
+        try
+        {
+            consumer.receive(10000);
+            assertTrue(System.currentTimeMillis() - startTime < 10000);
+        }
+        finally
+        {
+            thread.join();
+        }
     }
-
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org