You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/11/02 11:19:57 UTC
[1/2] activemq-artemis git commit: This closes #1629
Repository: activemq-artemis
Updated Branches:
refs/heads/master 8703d9d51 -> 5997e21ec
This closes #1629
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5997e21e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5997e21e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5997e21e
Branch: refs/heads/master
Commit: 5997e21ec758841df07aae47ac18e6e3ef6dc078
Parents: 8703d9d 61ce7a7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Nov 2 11:19:35 2017 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Nov 2 11:19:35 2017 +0000
----------------------------------------------------------------------
.../impl/ActiveMQServerControlImpl.java | 1 +
.../management/ActiveMQServerControlTest.java | 32 ++++++++++++++++++++
2 files changed, 33 insertions(+)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1486 Core client should be
notified if consumer is closed on broker side
Posted by ma...@apache.org.
ARTEMIS-1486 Core client should be notified if consumer is closed on broker side
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/61ce7a74
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/61ce7a74
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/61ce7a74
Branch: refs/heads/master
Commit: 61ce7a74545dd33340a3cf619c1b4ef9e8558bf2
Parents: 8703d9d
Author: Stanislav Knot <sk...@redhat.com>
Authored: Tue Oct 31 14:08:23 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Nov 2 11:19:35 2017 +0000
----------------------------------------------------------------------
.../impl/ActiveMQServerControlImpl.java | 1 +
.../management/ActiveMQServerControlTest.java | 32 ++++++++++++++++++++
2 files changed, 33 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61ce7a74/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 03defb5..b5b5a96 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1621,6 +1621,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
for (ServerConsumer serverConsumer : serverConsumers) {
if (serverConsumer.sequentialID() == Long.valueOf(ID)) {
serverConsumer.close(true);
+ serverConsumer.disconnect();
return true;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/61ce7a74/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index e3725d8..fde247c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -57,6 +57,8 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
@@ -1978,6 +1980,36 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
}
+ @Test
+ public void testCloseConsumer() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString name = RandomUtil.randomSimpleString();
+ boolean durable = true;
+
+ ActiveMQServerControl serverControl = createManagementControl();
+
+ checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+ serverControl.createAddress(address.toString(), "ANYCAST");
+ serverControl.createQueue(address.toString(), "ANYCAST", name.toString(), null, durable, -1, false, false);
+
+ ServerLocator receiveLocator = createInVMNonHALocator();
+ ClientSessionFactory receiveCsf = createSessionFactory(receiveLocator);
+ ClientSession receiveClientSession = receiveCsf.createSession(true, false, false);
+ final ClientConsumer consumer = receiveClientSession.createConsumer(name);
+ final ClientProducer producer = receiveClientSession.createProducer(name);
+
+ ServerSession ss = server.getSessions().iterator().next();
+ ServerConsumer sc = ss.getServerConsumers().iterator().next();
+
+ producer.send(receiveClientSession.createMessage(true));
+ consumer.receive(1000);
+
+ Assert.assertFalse(consumer.isClosed());
+ serverControl.closeConsumerWithID(((ClientSessionImpl)receiveClientSession).getName(), Long.toString(sc.sequentialID()));
+ Wait.waitFor(() -> consumer.isClosed(), 1000, 100);
+ Assert.assertTrue(consumer.isClosed());
+ }
+
protected void scaleDown(ScaleDownHandler handler) throws Exception {
SimpleString address = new SimpleString("testQueue");
HashMap<String, Object> params = new HashMap<>();