You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/10/21 09:56:17 UTC

[14/41] activemq-artemis git commit: ARTEMIS-795 Tests for temporary destinations using dynamic nodes

ARTEMIS-795 Tests for temporary destinations using dynamic nodes

Tests for the management of temporary destinations using the dynamic
node feature.  Failing case, the broker return a source or target that
indicates it will honor the lifetime policy of delete on close but the
temporary destination remains in existence after the link it closed.

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f1728abb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f1728abb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f1728abb

Branch: refs/heads/ARTEMIS-780
Commit: f1728abb5f04bef29f4640114444060c4ff95f24
Parents: 6052f7a
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 11 18:51:07 2016 -0400
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Oct 14 10:24:13 2016 +0100

----------------------------------------------------------------------
 .../amqp/AmqpTempDestinationTest.java           | 329 +++++++++++++++++++
 1 file changed, 329 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f1728abb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
new file mode 100644
index 0000000..4dbe21e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTempDestinationTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY;
+import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.DeleteOnClose;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for temporary destination handling over AMQP
+ */
+public class AmqpTempDestinationTest extends AmqpClientTestSupport {
+
+   protected static final Logger LOG = LoggerFactory.getLogger(AmqpTempDestinationTest.class);
+
+   @Test(timeout = 60000)
+   public void testCreateDynamicSenderToTopic() throws Exception {
+      doTestCreateDynamicSender(true);
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateDynamicSenderToQueue() throws Exception {
+      doTestCreateDynamicSender(false);
+   }
+
+   @SuppressWarnings("unchecked")
+   protected void doTestCreateDynamicSender(boolean topic) throws Exception {
+      Target target = createDynamicTarget(topic);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(target);
+      assertNotNull(sender);
+
+      Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+      assertTrue(remoteTarget.getDynamic());
+      assertTrue(remoteTarget.getDurable().equals(TerminusDurability.NONE));
+      assertTrue(remoteTarget.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
+
+      // Check the dynamic node lifetime-policy
+      Map<Symbol, Object> dynamicNodeProperties = remoteTarget.getDynamicNodeProperties();
+      assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
+      assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
+
+      Queue queueView = getProxyToQueue(remoteTarget.getAddress());
+      assertNotNull(queueView);
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testDynamicSenderLifetimeBoundToLinkTopic() throws Exception {
+      doTestDynamicSenderLifetimeBoundToLinkQueue(true);
+   }
+
+   @Test(timeout = 60000)
+   public void testDynamicSenderLifetimeBoundToLinkQueue() throws Exception {
+      doTestDynamicSenderLifetimeBoundToLinkQueue(false);
+   }
+
+   protected void doTestDynamicSenderLifetimeBoundToLinkQueue(boolean topic) throws Exception {
+      Target target = createDynamicTarget(topic);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(target);
+      assertNotNull(sender);
+
+      Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+      Queue queueView = getProxyToQueue(remoteTarget.getAddress());
+      assertNotNull(queueView);
+
+      sender.close();
+
+      queueView = getProxyToQueue(remoteTarget.getAddress());
+      assertNull(queueView);
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateDynamicReceiverToTopic() throws Exception {
+      doTestCreateDynamicSender(true);
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateDynamicReceiverToQueue() throws Exception {
+      doTestCreateDynamicSender(false);
+   }
+
+   @SuppressWarnings("unchecked")
+   protected void doTestCreateDynamicReceiver(boolean topic) throws Exception {
+      Source source = createDynamicSource(topic);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(source);
+      assertNotNull(receiver);
+
+      Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+      assertTrue(remoteSource.getDynamic());
+      assertTrue(remoteSource.getDurable().equals(TerminusDurability.NONE));
+      assertTrue(remoteSource.getExpiryPolicy().equals(TerminusExpiryPolicy.LINK_DETACH));
+
+      // Check the dynamic node lifetime-policy
+      Map<Symbol, Object> dynamicNodeProperties = remoteSource.getDynamicNodeProperties();
+      assertTrue(dynamicNodeProperties.containsKey(LIFETIME_POLICY));
+      assertEquals(DeleteOnClose.getInstance(), dynamicNodeProperties.get(LIFETIME_POLICY));
+
+      Queue queueView = getProxyToQueue(remoteSource.getAddress());
+      assertNotNull(queueView);
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testDynamicReceiverLifetimeBoundToLinkTopic() throws Exception {
+      doTestDynamicReceiverLifetimeBoundToLinkQueue(true);
+   }
+
+   @Test(timeout = 60000)
+   public void testDynamicReceiverLifetimeBoundToLinkQueue() throws Exception {
+      doTestDynamicReceiverLifetimeBoundToLinkQueue(false);
+   }
+
+   protected void doTestDynamicReceiverLifetimeBoundToLinkQueue(boolean topic) throws Exception {
+      Source source = createDynamicSource(topic);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(source);
+      assertNotNull(receiver);
+
+      Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+      Queue queueView = getProxyToQueue(remoteSource.getAddress());
+      assertNotNull(queueView);
+
+      receiver.close();
+
+      queueView = getProxyToQueue(remoteSource.getAddress());
+      assertNull(queueView);
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void TestCreateDynamicQueueSenderAndPublish() throws Exception {
+      doTestCreateDynamicSenderAndPublish(false);
+   }
+
+   @Test(timeout = 60000)
+   public void TestCreateDynamicTopicSenderAndPublish() throws Exception {
+      doTestCreateDynamicSenderAndPublish(true);
+   }
+
+   protected void doTestCreateDynamicSenderAndPublish(boolean topic) throws Exception {
+      Target target = createDynamicTarget(topic);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(target);
+      assertNotNull(sender);
+
+      Target remoteTarget = (Target) sender.getEndpoint().getRemoteTarget();
+      Queue queueView = getProxyToQueue(remoteTarget.getAddress());
+      assertNotNull(queueView);
+
+      // Get the new address
+      String address = sender.getSender().getRemoteTarget().getAddress();
+      LOG.info("New dynamic sender address -> {}", address);
+
+      // Create a message and send to a receive that is listening on the newly
+      // created dynamic link address.
+      AmqpMessage message = new AmqpMessage();
+      message.setMessageId("msg-1");
+      message.setText("Test-Message");
+
+      AmqpReceiver receiver = session.createReceiver(address);
+      receiver.flow(1);
+
+      sender.send(message);
+
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull("Should have read a message", received);
+      received.accept();
+
+      receiver.close();
+      sender.close();
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateDynamicReceiverToTopicAndSend() throws Exception {
+      doTestCreateDynamicSender(true);
+   }
+
+   @Test(timeout = 60000)
+   public void testCreateDynamicReceiverToQueueAndSend() throws Exception {
+      doTestCreateDynamicSender(false);
+   }
+
+   protected void doTestCreateDynamicReceiverAndSend(boolean topic) throws Exception {
+      Source source = createDynamicSource(topic);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(source);
+      assertNotNull(receiver);
+
+      Source remoteSource = (Source) receiver.getEndpoint().getRemoteSource();
+      Queue queueView = getProxyToQueue(remoteSource.getAddress());
+      assertNotNull(queueView);
+
+      // Get the new address
+      String address = receiver.getReceiver().getRemoteSource().getAddress();
+      LOG.info("New dynamic receiver address -> {}", address);
+
+      // Create a message and send to a receive that is listening on the newly
+      // created dynamic link address.
+      AmqpMessage message = new AmqpMessage();
+      message.setMessageId("msg-1");
+      message.setText("Test-Message");
+
+      AmqpSender sender = session.createSender(address);
+      sender.send(message);
+
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull("Should have read a message", received);
+      received.accept();
+
+      sender.close();
+      receiver.close();
+
+      connection.close();
+   }
+
+   protected Source createDynamicSource(boolean topic) {
+
+      Source source = new Source();
+      source.setDynamic(true);
+      source.setDurable(TerminusDurability.NONE);
+      source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+      // Set the dynamic node lifetime-policy
+      Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+      dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
+      source.setDynamicNodeProperties(dynamicNodeProperties);
+
+      // Set the capability to indicate the node type being created
+      if (!topic) {
+         source.setCapabilities(TEMP_QUEUE_CAPABILITY);
+      } else {
+         source.setCapabilities(TEMP_TOPIC_CAPABILITY);
+      }
+
+      return source;
+   }
+
+   protected Target createDynamicTarget(boolean topic) {
+
+      Target target = new Target();
+      target.setDynamic(true);
+      target.setDurable(TerminusDurability.NONE);
+      target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+      // Set the dynamic node lifetime-policy
+      Map<Symbol, Object> dynamicNodeProperties = new HashMap<>();
+      dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance());
+      target.setDynamicNodeProperties(dynamicNodeProperties);
+
+      // Set the capability to indicate the node type being created
+      if (!topic) {
+         target.setCapabilities(TEMP_QUEUE_CAPABILITY);
+      } else {
+         target.setCapabilities(TEMP_TOPIC_CAPABILITY);
+      }
+
+      return target;
+   }
+}