You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/04/19 04:50:36 UTC
[2/3] activemq-artemis git commit: ARTEMIS-1123 Clean up and add new
AMQP tests
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
index 689c23c..748f10a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -36,101 +36,170 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
public void testSendWithDeliveryTimeIsScheduled() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ try {
+ AmqpSession session = connection.createSession();
- // Get the Queue View early to avoid racing the delivery.
- final Queue queueView = getProxyToQueue(getTestName());
- assertNotNull(queueView);
+ AmqpSender sender = session.createSender(getQueueName());
- AmqpMessage message = new AmqpMessage();
- long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
- message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
- message.setText("Test-Message");
- sender.send(message);
- sender.close();
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getQueueName());
+ assertNotNull(queueView);
- assertEquals(1, queueView.getScheduledCount());
+ AmqpMessage message = new AmqpMessage();
+ long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
+ message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
- // Now try and get the message
- AmqpReceiver receiver = session.createReceiver(getTestName());
- receiver.flow(1);
- AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
- assertNull(received);
+ assertEquals(1, queueView.getScheduledCount());
- connection.close();
+ // Now try and get the message
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNull(received);
+ } finally {
+ connection.close();
+ }
}
@Test(timeout = 60000)
public void testSendRecvWithDeliveryTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ try {
+ AmqpSession session = connection.createSession();
- // Get the Queue View early to avoid racing the delivery.
- final Queue queueView = getProxyToQueue(getTestName());
- assertNotNull(queueView);
+ AmqpSender sender = session.createSender(getQueueName());
- AmqpMessage message = new AmqpMessage();
- long deliveryTime = System.currentTimeMillis() + 6000;
- message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
- message.setText("Test-Message");
- sender.send(message);
- sender.close();
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getQueueName());
+ assertNotNull(queueView);
- assertEquals(1, queueView.getScheduledCount());
+ AmqpMessage message = new AmqpMessage();
+ long deliveryTime = System.currentTimeMillis() + 6000;
+ message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
- AmqpReceiver receiver = session.createReceiver(getTestName());
- receiver.flow(1);
+ assertEquals(1, queueView.getScheduledCount());
- // Now try and get the message, should not due to being scheduled.
- AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
- assertNull(received);
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ receiver.flow(1);
- // Now try and get the message, should get it now
- received = receiver.receive(10, TimeUnit.SECONDS);
- assertNotNull(received);
- received.accept();
+ // Now try and get the message, should not due to being scheduled.
+ AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+ assertNull(received);
- connection.close();
+ // Now try and get the message, should get it now
+ received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+ } finally {
+ connection.close();
+ }
}
@Test
public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ try {
+ AmqpSession session = connection.createSession();
- // Get the Queue View early to avoid racing the delivery.
- final Queue queueView = getProxyToQueue(getTestName());
- assertNotNull(queueView);
+ AmqpSender sender = session.createSender(getQueueName());
- AmqpMessage message = new AmqpMessage();
- long delay = 6000;
- message.setMessageAnnotation("x-opt-delivery-delay", delay);
- message.setText("Test-Message");
- sender.send(message);
- sender.close();
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getQueueName());
+ assertNotNull(queueView);
- assertEquals(1, queueView.getScheduledCount());
+ AmqpMessage message = new AmqpMessage();
+ long delay = 6000;
+ message.setMessageAnnotation("x-opt-delivery-delay", delay);
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
- AmqpReceiver receiver = session.createReceiver(getTestName());
- receiver.flow(1);
+ assertEquals(1, queueView.getScheduledCount());
- // Now try and get the message, should not due to being scheduled.
- AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
- assertNull(received);
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ receiver.flow(1);
- // Now try and get the message, should get it now
- received = receiver.receive(10, TimeUnit.SECONDS);
- assertNotNull(received);
- received.accept();
+ // Now try and get the message, should not due to being scheduled.
+ AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+ assertNull(received);
- connection.close();
+ // Now try and get the message, should get it now
+ received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
+ AmqpClient client = createAmqpClient();
+ assertNotNull(client);
+
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getQueueName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+ AmqpMessage message = new AmqpMessage();
+ long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
+ message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+ message.setText("Test-Message");
+ sender.send(message);
+
+ // Now try and get the message
+ receiver.flow(1);
+
+ // Shouldn't get this since we delayed the message.
+ assertNull(receiver.receive(1, TimeUnit.SECONDS));
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
+ AmqpClient client = createAmqpClient();
+ assertNotNull(client);
+
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getQueueName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+ AmqpMessage message = new AmqpMessage();
+ long deliveryTime = System.currentTimeMillis() + 2000;
+ message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+ message.setText("Test-Message");
+ sender.send(message);
+
+ // Now try and get the message
+ receiver.flow(1);
+
+ AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+ Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
+ assertNotNull(msgDeliveryTime);
+ assertEquals(deliveryTime, msgDeliveryTime.longValue());
+ } finally {
+ connection.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
index e2b80f5..8e41d71 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -22,11 +22,11 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -57,7 +57,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
HashSet<Role> value = new HashSet<>();
value.add(new Role("none", false, true, true, true, true, true, true, true));
- securityRepository.addMatch(getTestName(), value);
+ securityRepository.addMatch(getQueueName(), value);
serverManager = new JMSServerManagerImpl(server);
Configuration serverConfig = server.getConfiguration();
@@ -135,7 +135,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setMessageId("msg" + 1);
@@ -154,8 +154,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTestName()), RoutingType.ANYCAST));
- server.createQueue(new SimpleString(getTestName()), RoutingType.ANYCAST, new SimpleString(getTestName()), null, true, false);
+ server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
+ server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false);
AmqpClient client = createAmqpClient(user1, password1);
AmqpConnection connection = client.connect();
@@ -165,7 +165,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
AmqpSender sender = session.createAnonymousSender();
AmqpMessage message = new AmqpMessage();
- message.setAddress(getTestName());
+ message.setAddress(getQueueName());
message.setMessageId("msg" + 1);
message.setText("Test-Message");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 54b361c..9cf256a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -69,9 +69,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
- Queue queue = getProxyToQueue(getTestName());
+ Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue);
receiver.close();
@@ -84,9 +84,9 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
- sendMessages(getTestName(), 10);
+ sendMessages(getQueueName(), 10);
for (int i = 0; i < 10; i++) {
receiver.flow(1);
@@ -98,7 +98,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
- Queue queue = getProxyToQueue(getTestName());
+ Queue queue = getProxyToQueue(getQueueName());
assertNotNull(queue);
assertEquals(0, queue.getMessageCount());
}
@@ -130,7 +130,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- session.createReceiver(getTestName(), "JMSPriority > 8");
+ session.createReceiver(getQueueName(), "JMSPriority > 8");
connection.getStateInspector().assertValid();
connection.close();
@@ -163,7 +163,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- session.createReceiver(getTestName(), null, true);
+ session.createReceiver(getQueueName(), null, true);
connection.getStateInspector().assertValid();
connection.close();
@@ -177,7 +177,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
try {
- session.createReceiver(getTestName(), "null = 'f''", true);
+ session.createReceiver(getQueueName(), "null = 'f''", true);
fail("should throw exception");
} catch (Exception e) {
assertTrue(e.getCause() instanceof JMSException);
@@ -189,15 +189,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
- sendMessages(getTestName(), 1);
+ sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
- Queue queueView = getProxyToQueue(getTestName());
+ Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@@ -211,11 +211,11 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testQueueReceiverReadMessageWithDivert() throws Exception {
- final String forwardingAddress = getTestName() + "Divert";
+ final String forwardingAddress = getQueueName() + "Divert";
final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
- server.getActiveMQServerControl().createDivert("name", "routingName", getTestName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
- sendMessages(getTestName(), 1);
+ server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
+ sendMessages(getQueueName(), 1);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
@@ -313,15 +313,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
- sendMessages(getTestName(), 1, false);
+ sendMessages(getQueueName(), 1, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
- Queue queueView = getProxyToQueue(getTestName());
+ Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@@ -337,15 +337,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageDurableTrue() throws Exception {
- sendMessages(getTestName(), 1, true);
+ sendMessages(getQueueName(), 1, true);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
- Queue queueView = getProxyToQueue(getTestName());
+ Queue queueView = getProxyToQueue(getQueueName());
assertEquals(1, queueView.getMessageCount());
receiver.flow(1);
@@ -362,22 +362,22 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;
- sendMessages(getTestName(), MSG_COUNT);
+ sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
- Queue queueView = getProxyToQueue(getTestName());
+ Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(2, server.getTotalConsumerCount());
@@ -398,15 +398,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
int MSG_COUNT = 4;
- sendMessages(getTestName(), MSG_COUNT);
+ sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
- final Queue queueView = getProxyToQueue(getTestName());
+ final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(2);
@@ -425,7 +425,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(2, server.getTotalConsumerCount());
@@ -456,15 +456,15 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
int MSG_COUNT = 20;
- sendMessages(getTestName(), MSG_COUNT);
+ sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
- final Queue queueView = getProxyToQueue(getTestName());
+ final Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
receiver1.flow(20);
@@ -479,7 +479,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver1.close();
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session.createReceiver(getQueueName());
assertEquals(1, server.getTotalConsumerCount());
@@ -513,7 +513,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
@@ -525,7 +525,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read message with receiver");
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2);
AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
assertNotNull("Should have read message", received);
@@ -544,7 +544,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@@ -560,17 +560,17 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
- Queue queue = getProxyToQueue(getTestName());
+ Queue queue = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queue.getMessageCount());
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(MSG_COUNT);
AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have got a message", received);
assertEquals("msg0", received.getMessageId());
receiver1.close();
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(200);
for (int i = 0; i < MSG_COUNT; ++i) {
received = receiver2.receive(5, TimeUnit.SECONDS);
@@ -597,12 +597,12 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.setGroupId("hijklm");
message2.setApplicationProperty("sn", 200);
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
sender.send(message1);
sender.send(message2);
sender.close();
- AmqpReceiver receiver = session.createReceiver(getTestName(), "sn = 100");
+ AmqpReceiver receiver = session.createReceiver(getQueueName(), "sn = 100");
receiver.flow(2);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have read a message", received);
@@ -624,7 +624,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@@ -639,7 +639,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
LOG.info("Attempting to read first two messages with receiver #1");
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(2);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
AmqpMessage message2 = receiver1.receive(10, TimeUnit.SECONDS);
@@ -651,7 +651,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.accept();
LOG.info("Attempting to read next two messages with receiver #2");
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(2);
AmqpMessage message3 = receiver2.receive(10, TimeUnit.SECONDS);
AmqpMessage message4 = receiver2.receive(10, TimeUnit.SECONDS);
@@ -685,7 +685,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < MSG_COUNT; i++) {
AmqpMessage message = new AmqpMessage();
@@ -699,10 +699,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
sender.close();
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
receiver1.flow(1);
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
+ AmqpReceiver receiver2 = session.createReceiver(getQueueName());
receiver2.flow(1);
AmqpMessage message1 = receiver1.receive(10, TimeUnit.SECONDS);
@@ -759,7 +759,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- final String address = getTestName();
+ final String address = getQueueName();
AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address);
@@ -793,7 +793,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
final CountDownLatch receiverReady = new CountDownLatch(1);
ExecutorService executorService = Executors.newCachedThreadPool();
- final String address = getTestName();
+ final String address = getQueueName();
executorService.submit(new Runnable() {
@Override
@@ -858,10 +858,10 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
- AmqpReceiver receiver1 = session.createReceiver(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
+ AmqpReceiver receiver1 = session.createReceiver(getQueueName());
- Queue queue = getProxyToQueue(getTestName());
+ Queue queue = getProxyToQueue(getQueueName());
// Create default message that should be sent as non-durable
AmqpMessage message1 = new AmqpMessage();
@@ -904,7 +904,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- final String address = getTestName();
+ final String address = getQueueName();
AmqpReceiver receiver = session.createReceiver(address);
AmqpSender sender = session.createSender(address);
@@ -957,7 +957,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- final String address = getTestName();
+ final String address = getQueueName();
AmqpSender sender = session.createSender(address);
AmqpReceiver receiver1 = session.createReceiver(address, null, false, true);
@@ -1036,7 +1036,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender("queue://" + getTestName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
+ AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
assertNotNull(sender);
connection.getStateInspector().assertValid();
@@ -1047,7 +1047,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageWithToFieldSetToSenderAddress() throws Exception {
- doTestMessageWithToFieldSet(false, getTestName());
+ doTestMessageWithToFieldSet(false, getQueueName());
}
@Test(timeout = 60000)
@@ -1067,7 +1067,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageWithToFieldSetWithAnonymousSender() throws Exception {
- doTestMessageWithToFieldSet(true, getTestName());
+ doTestMessageWithToFieldSet(true, getQueueName());
}
private void doTestMessageWithToFieldSet(boolean anonymous, String expected) throws Exception {
@@ -1075,7 +1075,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- final String address = getTestName();
+ final String address = getQueueName();
AmqpSender sender = session.createSender(anonymous ? null : address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
index 8f89452..a360eb8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.artemis.tests.integration.amqp;
import java.net.URI;
@@ -25,8 +24,12 @@ import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.junit.After;
-/** This will only add methods to support AMQP Testing without creating servers or anything */
+/**
+ * Base test support class providing client support methods to aid in
+ * creating and configuration the AMQP test client.
+ */
public class AmqpTestSupport extends ActiveMQTestBase {
+
protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected boolean useSSL;
@@ -121,7 +124,4 @@ public class AmqpTestSupport extends ActiveMQTestBase {
public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
return new AmqpClient(brokerURI, username, password);
}
-
-
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
index 3a9d498..3b231fa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTransactionTest.java
@@ -94,7 +94,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
assertNotNull(session);
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
sender.setStateInspector(new AmqpValidator() {
@Override
@@ -148,8 +148,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
- final Queue queue = getProxyToQueue(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
+ final Queue queue = getProxyToQueue(getQueueName());
session.begin();
@@ -173,8 +173,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
- final Queue queue = getProxyToQueue(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
+ final Queue queue = getProxyToQueue(getQueueName());
session.begin();
@@ -198,8 +198,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
- final Queue queue = getProxyToQueue(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
+ final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@@ -207,7 +207,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin();
@@ -230,8 +230,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
- final Queue queue = getProxyToQueue(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
+ final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@@ -239,7 +239,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin();
@@ -253,7 +253,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
connection = addConnection(client.connect());
session = connection.createSession();
- receiver = session.createReceiver(getTestName());
+ receiver = session.createReceiver(getQueueName());
session.begin();
receiver.flow(1);
@@ -274,8 +274,8 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
- final Queue queue = getProxyToQueue(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
+ final Queue queue = getProxyToQueue(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
@@ -283,7 +283,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
assertEquals(1, queue.getMessageCount());
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
session.begin();
@@ -308,7 +308,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Load up the Queue with some messages
{
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
@@ -326,11 +326,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
- AmqpReceiver receiver1 = session1.createReceiver(getTestName());
- AmqpReceiver receiver2 = session2.createReceiver(getTestName());
- AmqpReceiver receiver3 = session3.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session1.createReceiver(getQueueName());
+ AmqpReceiver receiver2 = session2.createReceiver(getQueueName());
+ AmqpReceiver receiver3 = session3.createReceiver(getQueueName());
- final Queue queue = getProxyToQueue(getTestName());
+ final Queue queue = getProxyToQueue(getQueueName());
assertEquals(3, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@@ -365,7 +365,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Load up the Queue with some messages
{
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
@@ -383,11 +383,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
- AmqpReceiver receiver1 = session1.createReceiver(getTestName());
- AmqpReceiver receiver2 = session2.createReceiver(getTestName());
- AmqpReceiver receiver3 = session3.createReceiver(getTestName());
+ AmqpReceiver receiver1 = session1.createReceiver(getQueueName());
+ AmqpReceiver receiver2 = session2.createReceiver(getQueueName());
+ AmqpReceiver receiver3 = session3.createReceiver(getQueueName());
- final Queue queue = getProxyToQueue(getTestName());
+ final Queue queue = getProxyToQueue(getQueueName());
assertEquals(3, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@@ -428,11 +428,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
- AmqpSender sender1 = session1.createSender(getTestName());
- AmqpSender sender2 = session2.createSender(getTestName());
- AmqpSender sender3 = session3.createSender(getTestName());
+ AmqpSender sender1 = session1.createSender(getQueueName());
+ AmqpSender sender2 = session2.createSender(getQueueName());
+ AmqpSender sender3 = session3.createSender(getQueueName());
- final Queue queue = getProxyToQueue(getTestName());
+ final Queue queue = getProxyToQueue(getQueueName());
assertEquals(0, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@@ -468,11 +468,11 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session3 = connection.createSession();
// Sender linked to each session
- AmqpSender sender1 = session1.createSender(getTestName());
- AmqpSender sender2 = session2.createSender(getTestName());
- AmqpSender sender3 = session3.createSender(getTestName());
+ AmqpSender sender1 = session1.createSender(getQueueName());
+ AmqpSender sender2 = session2.createSender(getQueueName());
+ AmqpSender sender3 = session3.createSender(getQueueName());
- final Queue queue = getProxyToQueue(getTestName());
+ final Queue queue = getProxyToQueue(getQueueName());
assertEquals(0, queue.getMessageCount());
// Begin the transaction that all senders will operate in.
@@ -509,7 +509,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
// Commit TXN work from a sender.
txnSession.begin();
@@ -538,7 +538,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
txnSession.commit();
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(NUM_MESSAGES * 2);
for (int i = 0; i < NUM_MESSAGES * 2; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
@@ -563,7 +563,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
@@ -573,7 +573,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
@@ -629,7 +629,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
@@ -639,7 +639,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
@@ -700,7 +700,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES + 1; ++i) {
AmqpMessage message = new AmqpMessage();
@@ -710,7 +710,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
ArrayList<AmqpMessage> messages = new ArrayList<>(NUM_MESSAGES);
receiver.flow((NUM_MESSAGES + 2) * 2);
for (int i = 0; i < NUM_MESSAGES; ++i) {
@@ -787,7 +787,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
// Normal Session which won't create an TXN itself
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
for (int i = 0; i < NUM_MESSAGES; ++i) {
AmqpMessage message = new AmqpMessage();
@@ -797,7 +797,7 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
}
// Read all messages from the Queue, do not accept them yet.
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(2);
AmqpMessage message1 = receiver.receive(5, TimeUnit.SECONDS);
AmqpMessage message2 = receiver.receive(5, TimeUnit.SECONDS);
@@ -930,12 +930,12 @@ public class AmqpTransactionTest extends AmqpClientTestSupport {
AmqpSession session = connection.createSession();
assertNotNull(session);
- AmqpSender sender = session.createSender(getTestName());
+ AmqpSender sender = session.createSender(getQueueName());
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
sender.send(message);
- AmqpReceiver receiver = session.createReceiver(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.setStateInspector(new AmqpValidator() {
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0260a304/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 4ee94c2..7933fec 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -16,6 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -41,26 +57,10 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.AddressControl;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.remoting.CloseListener;
@@ -84,17 +84,14 @@ import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.junit.After;
@@ -104,18 +101,12 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DELAYED_DELIVERY;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PRODUCT;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.VERSION;
-import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.contains;
-
@RunWith(Parameterized.class)
public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
- private static final String brokerName = "localhost";
private static final long maxSizeBytes = 1 * 1024 * 1024;
@@ -371,132 +362,6 @@ public class ProtonTest extends ProtonTestBase {
}
@Test
- public void testBrokerContainerId() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- assertTrue(brokerName.equals(amqpConnection.getEndpoint().getRemoteContainer()));
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testBrokerConnectionProperties() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- Map<Symbol, Object> properties = amqpConnection.getEndpoint().getRemoteProperties();
- assertTrue(properties != null);
- if (properties != null) {
- assertTrue("apache-activemq-artemis".equals(properties.get(Symbol.valueOf("product"))));
- assertTrue(VersionLoader.getVersion().getFullVersion().equals(properties.get(Symbol.valueOf("version"))));
- }
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test(timeout = 60000)
- public void testConnectionCarriesExpectedCapabilities() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- assertNotNull(client);
-
- client.setValidator(new AmqpValidator() {
-
- @Override
- public void inspectOpenedResource(org.apache.qpid.proton.engine.Connection connection) {
-
- Symbol[] offered = connection.getRemoteOfferedCapabilities();
-
- if (!contains(offered, DELAYED_DELIVERY)) {
- markAsInvalid("Broker did not indicate it support delayed message delivery");
- return;
- }
-
- Map<Symbol, Object> properties = connection.getRemoteProperties();
- if (!properties.containsKey(PRODUCT)) {
- markAsInvalid("Broker did not send a queue product name value");
- return;
- }
-
- if (!properties.containsKey(VERSION)) {
- markAsInvalid("Broker did not send a queue version value");
- return;
- }
- }
- });
-
- AmqpConnection connection = client.connect();
- try {
- assertNotNull(connection);
- connection.getStateInspector().assertValid();
- } finally {
- connection.close();
- }
- }
-
- @Test(timeout = 60000)
- public void testSendWithDeliveryTimeHoldsMessage() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- assertNotNull(client);
-
- AmqpConnection connection = client.connect();
- try {
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createSender(address);
- AmqpReceiver receiver = session.createReceiver(address);
-
- AmqpMessage message = new AmqpMessage();
- long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
- message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
- message.setText("Test-Message");
- sender.send(message);
-
- // Now try and get the message
- receiver.flow(1);
-
- // Shouldn't get this since we delayed the message.
- assertNull(receiver.receive(1, TimeUnit.SECONDS));
- } finally {
- connection.close();
- }
- }
-
- @Test(timeout = 60000)
- public void testSendWithDeliveryTimeDeliversMessageAfterDelay() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- assertNotNull(client);
-
- AmqpConnection connection = client.connect();
- try {
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createSender(address);
- AmqpReceiver receiver = session.createReceiver(address);
-
- AmqpMessage message = new AmqpMessage();
- long deliveryTime = System.currentTimeMillis() + 2000;
- message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
- message.setText("Test-Message");
- sender.send(message);
-
- // Now try and get the message
- receiver.flow(1);
-
- AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
- assertNotNull(received);
- received.accept();
- Long msgDeliveryTime = (Long) received.getMessageAnnotation("x-opt-delivery-time");
- assertNotNull(msgDeliveryTime);
- assertEquals(deliveryTime, msgDeliveryTime.longValue());
- } finally {
- connection.close();
- }
- }
-
- @Test
public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
String destinationAddress = address + 1;
@@ -984,41 +849,6 @@ public class ProtonTest extends ProtonTestBase {
}
@Test
- public void testManagementQueryOverAMQP() throws Throwable {
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- String destinationAddress = address + 1;
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender("activemq.management");
- AmqpReceiver receiver = session.createReceiver(destinationAddress);
- receiver.flow(10);
-
- //create request message for getQueueNames query
- AmqpMessage request = new AmqpMessage();
- request.setApplicationProperty("_AMQ_ResourceName", ResourceNames.BROKER);
- request.setApplicationProperty("_AMQ_OperationName", "getQueueNames");
- request.setReplyToAddress(destinationAddress);
- request.setText("[]");
-
- sender.send(request);
- AmqpMessage response = receiver.receive(5, TimeUnit.SECONDS);
- Assert.assertNotNull(response);
- assertNotNull(response);
- Object section = response.getWrappedMessage().getBody();
- assertTrue(section instanceof AmqpValue);
- Object value = ((AmqpValue) section).getValue();
- assertTrue(value instanceof String);
- assertTrue(((String) value).length() > 0);
- assertTrue(((String) value).contains(destinationAddress));
- response.accept();
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
public void testReplyTo() throws Throwable {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue queue = session.createTemporaryQueue();
@@ -1792,93 +1622,6 @@ public class ProtonTest extends ProtonTestBase {
}
}
- @Test(timeout = 60000)
- public void testSendMessageOnAnonymousRelayLinkUsingMessageTo() throws Exception {
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection connection = client.connect();
-
- try {
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createAnonymousSender();
- AmqpMessage message = new AmqpMessage();
-
- message.setAddress(address);
- message.setMessageId("msg" + 1);
- message.setText("Test-Message");
-
- sender.send(message);
- sender.close();
-
- AmqpReceiver receiver = session.createReceiver(address);
- receiver.flow(1);
- AmqpMessage received = receiver.receive(10, TimeUnit.SECONDS);
- assertNotNull("Should have read message", received);
- assertEquals("msg1", received.getMessageId());
- received.accept();
-
- receiver.close();
- } finally {
- connection.close();
- }
- }
-
- @Test(timeout = 60000)
- public void testSendMessageFailsOnAnonymousRelayLinkWhenNoToValueSet() throws Exception {
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection connection = client.connect();
- try {
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createAnonymousSender();
- AmqpMessage message = new AmqpMessage();
-
- message.setMessageId("msg" + 1);
- message.setText("Test-Message");
-
- try {
- sender.send(message);
- fail("Should not be able to send, message should be rejected");
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- sender.close();
- }
- } finally {
- connection.close();
- }
- }
-
- @Test(timeout = 60000)
- public void testSendMessageFailsOnAnonymousRelayWhenToFieldHasNonExistingAddress() throws Exception {
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection connection = client.connect();
- try {
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createAnonymousSender();
- AmqpMessage message = new AmqpMessage();
-
- message.setAddress(address + "-not-in-service");
- message.setMessageId("msg" + 1);
- message.setText("Test-Message");
-
- try {
- sender.send(message);
- fail("Should not be able to send, message should be rejected");
- } catch (Exception ex) {
- ex.printStackTrace();
- } finally {
- sender.close();
- }
- } finally {
- connection.close();
- }
- }
-
private javax.jms.Queue createQueue(String address) throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {