You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/10/21 09:56:17 UTC
[14/41] activemq-artemis git commit: ARTEMIS-795 Tests for temporary
destinations using dynamic nodes
ARTEMIS-795 Tests for temporary destinations using dynamic nodes
Tests for the management of temporary destinations using the dynamic
node feature. Failing case, the broker return a source or target that
indicates it will honor the lifetime policy of delete on close but the
temporary destination remains in existence after the link it closed.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f1728abb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f1728abb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f1728abb
Branch: refs/heads/ARTEMIS-780
Commit: f1728abb5f04bef29f4640114444060c4ff95f24
Parents: 6052f7a
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 11 18:51:07 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Oct 14 10:24:13 2016 +0100
----------------------------------------------------------------------
.../amqp/AmqpTempDestinationTest.java | 329 +++++++++++++++++++
1 file changed, 329 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f1728abb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
new file mode 100644
index 0000000..4dbe21e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for temporary destination handling over AMQP
+ */
+public class AmqpTempDestinationTest extends AmqpClientTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AmqpTempDestinationTest.class);
+
+ @Test(timeout = 60000)
+ public void testCreateDynamicSenderToTopic() throws Exception {
+ doTestCreateDynamicSender(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDynamicSenderToQueue() throws Exception {
+ doTestCreateDynamicSender(false);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void doTestCreateDynamicSender(boolean topic) throws Exception {
+ Target target = createDynamicTarget(topic);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(target);
+ assertNotNull(sender);
+
+ Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+ assertTrue(remoteTarget.getDynamic());
+ assertTrue(remoteTarget.getDurable().equals(TerminusDurability.NONE));
+ assertTrue(remoteTarget.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
+
+ // Check the dynamic node lifetime-policy
+ Map<Symbol, Object> dynamicNodeProperties = remoteTarget.getDynamicNodeProperties();
+ assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
+ assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
+
+ Queue queueView = getProxyToQueue(remoteTarget.getAddress());
+ assertNotNull(queueView);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception {
+ doTestDynamicSenderLifetimeBoundToLinkQueue(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception {
+ doTestDynamicSenderLifetimeBoundToLinkQueue(false);
+ }
+
+ protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception {
+ Target target = createDynamicTarget(topic);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(target);
+ assertNotNull(sender);
+
+ Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+ Queue queueView = getProxyToQueue(remoteTarget.getAddress());
+ assertNotNull(queueView);
+
+ sender.close();
+
+ queueView = getProxyToQueue(remoteTarget.getAddress());
+ assertNull(queueView);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDynamicReceiverToTopic() throws Exception {
+ doTestCreateDynamicSender(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDynamicReceiverToQueue() throws Exception {
+ doTestCreateDynamicSender(false);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void doTestCreateDynamicReceiver(boolean topic) throws Exception {
+ Source source = createDynamicSource(topic);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(source);
+ assertNotNull(receiver);
+
+ Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+ assertTrue(remoteSource.getDynamic());
+ assertTrue(remoteSource.getDurable().equals(TerminusDurability.NONE));
+ assertTrue(remoteSource.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
+
+ // Check the dynamic node lifetime-policy
+ Map<Symbol, Object> dynamicNodeProperties = remoteSource.getDynamicNodeProperties();
+ assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
+ assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
+
+ Queue queueView = getProxyToQueue(remoteSource.getAddress());
+ assertNotNull(queueView);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testDynamicReceiverLifetimeBoundToLinkTopic() throws Exception {
+ doTestDynamicReceiverLifetimeBoundToLinkQueue(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testDynamicReceiverLifetimeBoundToLinkQueue() throws Exception {
+ doTestDynamicReceiverLifetimeBoundToLinkQueue(false);
+ }
+
+ protected void doTestDynamicReceiverLifetimeBoundToLinkQueue(boolean topic) throws Exception {
+ Source source = createDynamicSource(topic);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(source);
+ assertNotNull(receiver);
+
+ Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+ Queue queueView = getProxyToQueue(remoteSource.getAddress());
+ assertNotNull(queueView);
+
+ receiver.close();
+
+ queueView = getProxyToQueue(remoteSource.getAddress());
+ assertNull(queueView);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void TestCreateDynamicQueueSenderAndPublish() throws Exception {
+ doTestCreateDynamicSenderAndPublish(false);
+ }
+
+ @Test(timeout = 60000)
+ public void TestCreateDynamicTopicSenderAndPublish() throws Exception {
+ doTestCreateDynamicSenderAndPublish(true);
+ }
+
+ protected void doTestCreateDynamicSenderAndPublish(boolean topic) throws Exception {
+ Target target = createDynamicTarget(topic);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(target);
+ assertNotNull(sender);
+
+ Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+ Queue queueView = getProxyToQueue(remoteTarget.getAddress());
+ assertNotNull(queueView);
+
+ // Get the new address
+ String address = sender.getSender().getRemoteTarget().getAddress();
+ LOG.info("New dynamic sender address -> {}", address);
+
+ // Create a message and send to a receive that is listening on the newly
+ // created dynamic link address.
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg-1");
+ message.setText("Test-Message");
+
+ AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(1);
+
+ sender.send(message);
+
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Should have read a message", received);
+ received.accept();
+
+ receiver.close();
+ sender.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDynamicReceiverToTopicAndSend() throws Exception {
+ doTestCreateDynamicSender(true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateDynamicReceiverToQueueAndSend() throws Exception {
+ doTestCreateDynamicSender(false);
+ }
+
+ protected void doTestCreateDynamicReceiverAndSend(boolean topic) throws Exception {
+ Source source = createDynamicSource(topic);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(source);
+ assertNotNull(receiver);
+
+ Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+ Queue queueView = getProxyToQueue(remoteSource.getAddress());
+ assertNotNull(queueView);
+
+ // Get the new address
+ String address = receiver.getReceiver().getRemoteSource().getAddress();
+ LOG.info("New dynamic receiver address -> {}", address);
+
+ // Create a message and send to a receive that is listening on the newly
+ // created dynamic link address.
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg-1");
+ message.setText("Test-Message");
+
+ AmqpSender sender = session.createSender(address);
+ sender.send(message);
+
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("Should have read a message", received);
+ received.accept();
+
+ sender.close();
+ receiver.close();
+
+ connection.close();
+ }
+
+ protected Source createDynamicSource(boolean topic) {
+
+ Source source = new Source();
+ source.setDynamic(true);
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+ // Set the dynamic node lifetime-policy
+ Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+ dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
+ source.setDynamicNodeProperties(dynamicNodeProperties);
+
+ // Set the capability to indicate the node type being created
+ if (!topic) {
+ source.setCapabilities(TEMP_QUEUE_CAPABILITY);
+ } else {
+ source.setCapabilities(TEMP_TOPIC_CAPABILITY);
+ }
+
+ return source;
+ }
+
+ protected Target createDynamicTarget(boolean topic) {
+
+ Target target = new Target();
+ target.setDynamic(true);
+ target.setDurable(TerminusDurability.NONE);
+ target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+ // Set the dynamic node lifetime-policy
+ Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+ dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
+ target.setDynamicNodeProperties(dynamicNodeProperties);
+
+ // Set the capability to indicate the node type being created
+ if (!topic) {
+ target.setCapabilities(TEMP_QUEUE_CAPABILITY);
+ } else {
+ target.setCapabilities(TEMP_TOPIC_CAPABILITY);
+ }
+
+ return target;
+ }
+}