You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/07/24 19:39:15 UTC
svn commit: r1613197 - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/model/adapter/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-core/src/test/java/org/apache/qpid/server/model/
systests/src/main/java/org/...
Author: kwall
Date: Thu Jul 24 17:39:15 2014
New Revision: 1613197
URL: http://svn.apache.org/r1613197
Log:
QPID-5915: [Java Broker] Ensure that closing a Connection model object also cause the underlying connection to close too
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java Thu Jul 24 17:39:15 2014
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.AbstractConfiguredObject;
@@ -43,24 +44,29 @@ import org.apache.qpid.server.util.Actio
public final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>,
SessionModelListener
{
- private AMQConnectionModel _connection;
+ private final Action _underlyingConnectionDeleteTask;
+ private final AtomicBoolean _underlyingClosed = new AtomicBoolean(false);
+ private AMQConnectionModel _underlyingConnection;
private State _state = State.ACTIVE;
public ConnectionAdapter(final AMQConnectionModel conn)
{
super(parentsMap(conn.getVirtualHost()),createAttributes(conn));
- _connection = conn;
+ _underlyingConnection = conn;
- conn.addDeleteTask(new Action()
+ // Used to allow the protocol layers to tell the model they have been deleted
+ _underlyingConnectionDeleteTask = new Action()
{
@Override
public void performAction(final Object object)
{
conn.removeDeleteTask(this);
+ _underlyingClosed.set(true);
deleted();
}
- });
+ };
+ conn.addDeleteTask(_underlyingConnectionDeleteTask);
conn.addSessionListener(this);
}
@@ -77,13 +83,13 @@ public final class ConnectionAdapter ext
@Override
public String getClientId()
{
- return _connection.getClientId();
+ return _underlyingConnection.getClientId();
}
@Override
public String getClientVersion()
{
- return _connection.getClientVersion();
+ return _underlyingConnection.getClientVersion();
}
@Override
@@ -101,14 +107,14 @@ public final class ConnectionAdapter ext
@Override
public String getPrincipal()
{
- final Principal authorizedPrincipal = _connection.getAuthorizedPrincipal();
+ final Principal authorizedPrincipal = _underlyingConnection.getAuthorizedPrincipal();
return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
}
@Override
public String getRemoteAddress()
{
- return _connection.getRemoteAddressString();
+ return _underlyingConnection.getRemoteAddressString();
}
@Override
@@ -126,19 +132,19 @@ public final class ConnectionAdapter ext
@Override
public long getSessionCountLimit()
{
- return _connection.getSessionCountLimit();
+ return _underlyingConnection.getSessionCountLimit();
}
@Override
public Transport getTransport()
{
- return _connection.getTransport();
+ return _underlyingConnection.getTransport();
}
@Override
public Port getPort()
{
- return _connection.getPort();
+ return _underlyingConnection.getPort();
}
public Collection<Session> getSessions()
@@ -149,17 +155,22 @@ public final class ConnectionAdapter ext
@StateTransition( currentState = State.ACTIVE, desiredState = State.DELETED)
private void doDelete()
{
- _connection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ closeUnderlyingConnection();
deleted();
_state = State.DELETED;
}
+ @Override
+ protected void onClose()
+ {
+ closeUnderlyingConnection();
+ }
+
public State getState()
{
return _state;
}
-
@Override
public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
@@ -177,37 +188,37 @@ public final class ConnectionAdapter ext
@Override
public long getBytesIn()
{
- return _connection.getDataReceiptStatistics().getTotal();
+ return _underlyingConnection.getDataReceiptStatistics().getTotal();
}
@Override
public long getBytesOut()
{
- return _connection.getDataDeliveryStatistics().getTotal();
+ return _underlyingConnection.getDataDeliveryStatistics().getTotal();
}
@Override
public long getMessagesIn()
{
- return _connection.getMessageReceiptStatistics().getTotal();
+ return _underlyingConnection.getMessageReceiptStatistics().getTotal();
}
@Override
public long getMessagesOut()
{
- return _connection.getMessageDeliveryStatistics().getTotal();
+ return _underlyingConnection.getMessageDeliveryStatistics().getTotal();
}
@Override
public long getLastIoTime()
{
- return _connection.getLastIoTime();
+ return _underlyingConnection.getLastIoTime();
}
@Override
public int getSessionCount()
{
- return _connection.getSessionModels().size();
+ return _underlyingConnection.getSessionModels().size();
}
@Override
@@ -223,4 +234,14 @@ public final class ConnectionAdapter ext
{
// SessionAdapter installs delete task to cause session model object to delete
}
+
+ private void closeUnderlyingConnection()
+ {
+ if (_underlyingClosed.compareAndSet(false, true))
+ {
+ _underlyingConnection.removeDeleteTask(_underlyingConnectionDeleteTask);
+ _underlyingConnection.close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ }
+ }
+
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Jul 24 17:39:15 2014
@@ -1727,13 +1727,13 @@ public abstract class AbstractQueue<X ex
}
}
- private QueueRunner _queueRunner = new QueueRunner(this);
+ private final QueueRunner _queueRunner = new QueueRunner(this);
public void deliverAsync()
{
_stateChangeCount.incrementAndGet();
- _queueRunner.execute(_asyncDelivery);
+ _queueRunner.execute();
}
@@ -1746,7 +1746,7 @@ public abstract class AbstractQueue<X ex
else
{
SubFlushRunner flusher = sub.getRunner();
- flusher.execute(_asyncDelivery);
+ flusher.execute();
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java Thu Jul 24 17:39:15 2014
@@ -114,12 +114,12 @@ public class QueueRunner implements Runn
return "QueueRunner-" + _queue.getLogSubject();
}
- public void execute(Executor executor)
+ public void execute()
{
_stateChange.set(true);
if(_scheduled.compareAndSet(IDLE, SCHEDULED))
{
- executor.execute(this);
+ _queue.execute(this);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java Thu Jul 24 17:39:15 2014
@@ -112,12 +112,12 @@ class SubFlushRunner implements Runnable
return "SubFlushRunner-" + _sub.toLogString();
}
- public void execute(Executor executor)
+ public void execute()
{
_stateChange.set(true);
if(_scheduled.compareAndSet(IDLE,SCHEDULED))
{
- executor.execute(this);
+ getQueue().execute(this);
}
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java Thu Jul 24 17:39:15 2014
@@ -31,14 +31,19 @@ import static org.mockito.Mockito.verify
import static org.mockito.Mockito.when;
import java.security.AccessControlException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.mockito.ArgumentMatcher;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.connection.IConnectionRegistry;
+import org.apache.qpid.server.connection.IConnectionRegistry.RegistryChangeListener;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
@@ -152,6 +157,57 @@ public class VirtualHostTest extends Qpi
verify(_configStore, times(2)).update(eq(false), matchesRecord(virtualHost.getId(), virtualHost.getType()));
}
+ public void testStopVirtualHost_ClosesConnections()
+ {
+ String virtualHostName = getName();
+
+ VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
+ assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+ AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+
+ assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size());
+
+ ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+
+ assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren(
+ Connection.class).size());
+
+ virtualHost.stop();
+ assertEquals("Unexpected state", State.STOPPED, virtualHost.getState());
+
+ assertEquals("Unexpected number of connections after virtualhost stopped",
+ 0,
+ virtualHost.getChildren(Connection.class).size());
+
+ verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ }
+
+ public void testDeleteVirtualHost_ClosesConnections()
+ {
+ String virtualHostName = getName();
+
+ VirtualHost<?, ?, ?> virtualHost = createVirtualHost(virtualHostName);
+ assertEquals("Unexpected state", State.ACTIVE, virtualHost.getState());
+
+ AMQConnectionModel connection = createMockProtocolConnection(virtualHost);
+
+ assertEquals("Unexpected number of connections before connection registered", 0, virtualHost.getChildren(Connection.class).size());
+
+ ((RegistryChangeListener)virtualHost).connectionRegistered(connection);
+
+ assertEquals("Unexpected number of connections after connection registered", 1, virtualHost.getChildren(Connection.class).size());
+
+ virtualHost.delete();
+ assertEquals("Unexpected state", State.DELETED, virtualHost.getState());
+
+ assertEquals("Unexpected number of connections after virtualhost deleted",
+ 0,
+ virtualHost.getChildren(Connection.class).size());
+
+ verify(connection).close(AMQConstant.CONNECTION_FORCED, "Connection closed by external action");
+ }
+
public void testCreateDurableQueue()
{
String virtualHostName = getName();
@@ -273,6 +329,14 @@ public class VirtualHostTest extends Qpi
return host;
}
+ private AMQConnectionModel createMockProtocolConnection(final VirtualHost<?, ?, ?> virtualHost)
+ {
+ final AMQConnectionModel connection = mock(AMQConnectionModel.class);
+ when(connection.getVirtualHost()).thenReturn(virtualHost);
+ when(connection.getRemoteAddressString()).thenReturn("peer:1234");
+ return connection;
+ }
+
private static ConfiguredObjectRecord matchesRecord(UUID id, String type)
{
return argThat(new MinimalConfiguredObjectRecordMatcher(id, type));
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java?rev=1613197&r1=1613196&r2=1613197&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/channelclose/CloseWithBlockingReceiveTest.java Thu Jul 24 17:39:15 2014
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.test.unit.client.channelclose;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
@@ -34,22 +34,12 @@ public class CloseWithBlockingReceiveTes
{
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
-
public void testReceiveReturnsNull() throws Exception
{
- final AMQConnection connection = (AMQConnection) getConnection("guest", "guest");
+ final Connection connection = getConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(new AMQTopic(connection, "banana"));
+ Destination destination = session.createQueue(getTestQueueName());
+ MessageConsumer consumer = session.createConsumer(destination);
connection.start();
Runnable r = new Runnable()
@@ -68,14 +58,16 @@ public class CloseWithBlockingReceiveTes
}
};
long startTime = System.currentTimeMillis();
- new Thread(r).start();
- consumer.receive(10000);
- assertTrue(System.currentTimeMillis() - startTime < 10000);
- }
-
- public static junit.framework.Test suite()
- {
- return new junit.framework.TestSuite(CloseWithBlockingReceiveTest.class);
+ Thread thread = new Thread(r);
+ thread.start();
+ try
+ {
+ consumer.receive(10000);
+ assertTrue(System.currentTimeMillis() - startTime < 10000);
+ }
+ finally
+ {
+ thread.join();
+ }
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org