You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/10/13 21:53:35 UTC
[1/2] activemq-artemis git commit: ARTEMIS-792 Add additional tests
for AMQP protocol
Repository: activemq-artemis
Updated Branches:
refs/heads/master 0a1e6bdd5 -> 9743043fb
ARTEMIS-792 Add additional tests for AMQP protocol
Adds several tests for AMQP expectations in various use cases.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7a8b7e9c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7a8b7e9c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7a8b7e9c
Branch: refs/heads/master
Commit: 7a8b7e9cfba8d4a8adfe96ee99d8a9df1273ac18
Parents: 0a1e6bd
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 11 12:16:28 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 13 16:07:50 2016 -0400
----------------------------------------------------------------------
.../integration/amqp/AmqpClientTestSupport.java | 67 ++---
.../amqp/AmqpDeliveryAnnotationsTest.java | 64 +++++
.../amqp/AmqpDescribedTypePayloadTest.java | 151 ++++++++++
.../integration/amqp/AmqpReceiverDrainTest.java | 165 +++++++++++
.../amqp/AmqpScheduledMessageTest.java | 136 +++++++++
.../integration/amqp/AmqpSendReceiveTest.java | 279 ++++++++++++++++++-
6 files changed, 823 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 2c7ce6f..14f9b61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -21,8 +21,12 @@ import java.net.URI;
import java.util.LinkedList;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -35,9 +39,11 @@ import org.junit.Before;
*/
public class AmqpClientTestSupport extends ActiveMQTestBase {
- ActiveMQServer server;
+ private boolean useSSL;
- LinkedList<AmqpConnection> connections = new LinkedList<>();
+ protected JMSServerManager serverManager;
+ protected ActiveMQServer server;
+ protected LinkedList<AmqpConnection> connections = new LinkedList<>();
protected AmqpConnection addConnection(AmqpConnection connection) {
connections.add(connection);
@@ -48,9 +54,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
@Override
public void setUp() throws Exception {
super.setUp();
-
- server = createServer(true, true);
- server.start();
+ server = createServer();
}
@After
@@ -63,18 +67,36 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
ignored.printStackTrace();
}
}
+
+ if (serverManager != null) {
+ try {
+ serverManager.stop();
+ } catch (Throwable ignored) {
+ ignored.printStackTrace();
+ }
+ serverManager = null;
+ }
+
server.stop();
super.tearDown();
}
+ protected ActiveMQServer createServer() throws Exception {
+ ActiveMQServer server = createServer(true, true);
+ serverManager = new JMSServerManagerImpl(server);
+ Configuration serverConfig = server.getConfiguration();
+ serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
+ serverConfig.setSecurityEnabled(false);
+ serverManager.start();
+ server.start();
+ return server;
+ }
+
public Queue getProxyToQueue(String queueName) {
return server.locateQueue(SimpleString.toSimpleString(queueName));
}
- private String connectorScheme = "amqp";
- private boolean useSSL;
-
public String getTestName() {
return "jms.queue." + getName();
}
@@ -83,14 +105,9 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
}
public AmqpClientTestSupport(String connectorScheme, boolean useSSL) {
- this.connectorScheme = connectorScheme;
this.useSSL = useSSL;
}
- public String getConnectorScheme() {
- return connectorScheme;
- }
-
public boolean isUseSSL() {
return useSSL;
}
@@ -99,30 +116,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
return "";
}
- protected boolean isUseTcpConnector() {
- return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
- }
-
- protected boolean isUseSslConnector() {
- return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
- }
-
- protected boolean isUseNioConnector() {
- return !isUseSSL() && connectorScheme.contains("nio");
- }
-
- protected boolean isUseNioPlusSslConnector() {
- return isUseSSL() && connectorScheme.contains("nio");
- }
-
- protected boolean isUseWsConnector() {
- return !isUseSSL() && connectorScheme.contains("ws");
- }
-
- protected boolean isUseWssConnector() {
- return isUseSSL() && connectorScheme.contains("wss");
- }
-
public URI getBrokerAmqpConnectionURI() {
boolean webSocket = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
new file mode 100644
index 0000000..93ff22b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDeliveryAnnotationsTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test around the handling of Deliver Annotations in messages sent and received.
+ */
+public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
+
+ private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
+
+ @Test(timeout = 60000)
+ public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+
+ message.setText("Test-Message");
+ message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
+
+ sender.send(message);
+ receiver.flow(1);
+
+ Queue queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getMessageCount());
+
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
+
+ sender.close();
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
new file mode 100644
index 0000000..bbb9c26
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDescribedTypePayloadTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpNoLocalFilter;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test that the broker can pass through an AMQP message with a described type in the message
+ * body regardless of transformer in use.
+ */
+public class AmqpDescribedTypePayloadTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testSendMessageWithDescribedTypeInBody() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setDescribedType(new AmqpNoLocalFilter());
+ sender.send(message);
+ sender.close();
+
+ Queue queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getMessageCount());
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ assertNotNull(received.getDescribedType());
+ receiver.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSendMessageWithDescribedTypeInBodyReceiveOverOpenWire() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setDescribedType(new AmqpNoLocalFilter());
+ sender.send(message);
+ sender.close();
+ connection.close();
+
+ Queue queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getMessageCount());
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ Connection jmsConnection = factory.createConnection();
+ try {
+ Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = jmsSession.createQueue(getName());
+ MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+ jmsConnection.start();
+
+ Message received = jmsConsumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof BytesMessage);
+ } finally {
+ jmsConnection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testDescribedTypeMessageRoundTrips() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ // Send with AMQP client.
+ AmqpSender sender = session.createSender(getTestName());
+ AmqpMessage message = new AmqpMessage();
+ message.setDescribedType(new AmqpNoLocalFilter());
+ sender.send(message);
+ sender.close();
+
+ Queue queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getMessageCount());
+
+ // Receive and resend with OpenWire JMS client
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
+ Connection jmsConnection = factory.createConnection();
+ try {
+ Session jmsSession = jmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = jmsSession.createQueue(getName());
+ MessageConsumer jmsConsumer = jmsSession.createConsumer(destination);
+ jmsConnection.start();
+
+ Message received = jmsConsumer.receive(5000);
+ assertNotNull(received);
+ assertTrue(received instanceof BytesMessage);
+
+ MessageProducer jmsProducer = jmsSession.createProducer(destination);
+ jmsProducer.send(received);
+ } finally {
+ jmsConnection.close();
+ }
+
+ assertEquals(1, queue.getMessageCount());
+
+ // Now lets receive it with AMQP and see that we get back what we expected.
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(1);
+ AmqpMessage returned = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(returned);
+ assertNotNull(returned.getDescribedType());
+ receiver.close();
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
new file mode 100644
index 0000000..1af9028
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Tests various behaviors of broker side drain support.
+ */
+public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testReceiverCanDrainMessages() throws Exception {
+ int MSG_COUNT = 20;
+ sendMessages(getTestName(), MSG_COUNT);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+ receiver.drain(MSG_COUNT);
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ }
+ receiver.close();
+
+ assertEquals(0, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testPullWithNoMessageGetDrained() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ receiver.flow(10);
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(0, queueView.getMessageCount());
+ assertEquals(0, queueView.getDeliveringCount());
+
+ assertEquals(10, receiver.getReceiver().getRemoteCredit());
+
+ assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+ assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testPullOneFromRemote() throws Exception {
+ int MSG_COUNT = 20;
+ sendMessages(getTestName(), MSG_COUNT);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+ assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+ AmqpMessage message = receiver.pull(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+
+ assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+ receiver.close();
+
+ assertEquals(MSG_COUNT - 1, queueView.getMessageCount());
+ assertEquals(1, queueView.getMessagesAcknowledged());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleZeroResultPulls() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ receiver.flow(10);
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(0, queueView.getMessageCount());
+
+ assertEquals(10, receiver.getReceiver().getRemoteCredit());
+
+ assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+ assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+ assertNull(receiver.pull(1, TimeUnit.SECONDS));
+ assertNull(receiver.pull(1, TimeUnit.SECONDS));
+
+ assertEquals(0, receiver.getReceiver().getRemoteCredit());
+
+ connection.close();
+ }
+
+ public void sendMessages(String destinationName, int count) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = null;
+
+ try {
+ connection = client.connect();
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message-" + i);
+ sender.send(message);
+ }
+
+ sender.close();
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
new file mode 100644
index 0000000..689c23c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test for scheduled message support using AMQP message annotations.
+ */
+public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testSendWithDeliveryTimeIsScheduled() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getTestName());
+ assertNotNull(queueView);
+
+ AmqpMessage message = new AmqpMessage();
+ long deliveryTime = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2);
+ message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
+
+ assertEquals(1, queueView.getScheduledCount());
+
+ // Now try and get the message
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(1);
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ assertNull(received);
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSendRecvWithDeliveryTime() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getTestName());
+ assertNotNull(queueView);
+
+ AmqpMessage message = new AmqpMessage();
+ long deliveryTime = System.currentTimeMillis() + 6000;
+ message.setMessageAnnotation("x-opt-delivery-time", deliveryTime);
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
+
+ assertEquals(1, queueView.getScheduledCount());
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(1);
+
+ // Now try and get the message, should not due to being scheduled.
+ AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+ assertNull(received);
+
+ // Now try and get the message, should get it now
+ received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ connection.close();
+ }
+
+ @Test
+ public void testScheduleWithDelay() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getTestName());
+
+ // Get the Queue View early to avoid racing the delivery.
+ final Queue queueView = getProxyToQueue(getTestName());
+ assertNotNull(queueView);
+
+ AmqpMessage message = new AmqpMessage();
+ long delay = 6000;
+ message.setMessageAnnotation("x-opt-delivery-delay", delay);
+ message.setText("Test-Message");
+ sender.send(message);
+ sender.close();
+
+ assertEquals(1, queueView.getScheduledCount());
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+ receiver.flow(1);
+
+ // Now try and get the message, should not due to being scheduled.
+ AmqpMessage received = receiver.receive(2, TimeUnit.SECONDS);
+ assertNull(received);
+
+ // Now try and get the message, should get it now
+ received = receiver.receive(10, TimeUnit.SECONDS);
+ assertNotNull(received);
+ received.accept();
+
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7a8b7e9c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 6597a62..6c50b86 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,22 +16,31 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.engine.Receiver;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +53,255 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
@Test(timeout = 60000)
+ public void testCreateQueueReceiver() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queue = getProxyToQueue(getTestName());
+ assertNotNull(queue);
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateQueueReceiverWithJMSSelector() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ client.setValidator(new AmqpValidator() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void inspectOpenedResource(Receiver receiver) {
+
+ if (receiver.getRemoteSource() == null) {
+ markAsInvalid("Link opened with null source.");
+ }
+
+ Source source = (Source) receiver.getRemoteSource();
+ Map<Symbol, Object> filters = source.getFilter();
+
+ if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
+ markAsInvalid("Broker did not return the JMS Filter on Attach");
+ }
+ }
+ });
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ session.createReceiver(getTestName(), "JMSPriority > 8");
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ client.setValidator(new AmqpValidator() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void inspectOpenedResource(Receiver receiver) {
+
+ if (receiver.getRemoteSource() == null) {
+ markAsInvalid("Link opened with null source.");
+ }
+
+ Source source = (Source) receiver.getRemoteSource();
+ Map<Symbol, Object> filters = source.getFilter();
+
+ // Currently don't support noLocal on a Queue
+ if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
+ markAsInvalid("Broker did not return the NoLocal Filter on Attach");
+ }
+ }
+ });
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ session.createReceiver(getTestName(), null, true);
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testQueueReceiverReadMessage() throws Exception {
+ sendMessages(getTestName(), 1);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
+ int MSG_COUNT = 4;
+ sendMessages(getTestName(), MSG_COUNT);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+ receiver1.flow(2);
+ assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+ assertNotNull(receiver1.receive(5, TimeUnit.SECONDS));
+
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+ assertEquals(2, server.getTotalConsumerCount());
+
+ receiver2.flow(2);
+ assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+ assertNotNull(receiver2.receive(5, TimeUnit.SECONDS));
+
+ assertEquals(0, queueView.getMessagesAcknowledged());
+
+ receiver1.close();
+ receiver2.close();
+
+ assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testTwoQueueReceiversOnSameConnectionReadMessagesAcceptOnEach() throws Exception {
+ int MSG_COUNT = 4;
+ sendMessages(getTestName(), MSG_COUNT);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+ final Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+ receiver1.flow(2);
+ AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ message = receiver1.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+
+ assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return queueView.getMessagesAcknowledged() == 2;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+ assertEquals(2, server.getTotalConsumerCount());
+
+ receiver2.flow(2);
+ message = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ message = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+
+ assertTrue("Queue should be empty now", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return queueView.getMessagesAcknowledged() == 4;
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(10)));
+
+ receiver1.close();
+ receiver2.close();
+
+ assertEquals(0, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSecondReceiverOnQueueGetsAllUnconsumedMessages() throws Exception {
+ int MSG_COUNT = 20;
+ sendMessages(getTestName(), MSG_COUNT);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver1 = session.createReceiver(getTestName());
+
+ final Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+ receiver1.flow(20);
+
+ assertTrue("Should have dispatch to prefetch", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return queueView.getDeliveringCount() >= 2;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+ receiver1.close();
+
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+ assertEquals(1, server.getTotalConsumerCount());
+
+ receiver2.flow(MSG_COUNT * 2);
+ AmqpMessage message = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+ message = receiver2.receive(5, TimeUnit.SECONDS);
+ assertNotNull(message);
+ message.accept();
+
+ assertTrue("Should have ack'd two", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return queueView.getMessagesAcknowledged() == 2;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(50)));
+
+ receiver2.close();
+
+ assertEquals(MSG_COUNT - 2, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testSimpleSendOneReceiveOne() throws Exception {
AmqpClient client = createAmqpClient();
@@ -476,7 +734,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertTrue("Should be no inflight messages: " + destinationView.getDeliveringCount(), Wait.waitFor(new Wait.Condition() {
@Override
- public boolean isSatisified() throws Exception {
+ public boolean isSatisfied() throws Exception {
return destinationView.getDeliveringCount() == 0;
}
}));
@@ -554,4 +812,21 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
+
+ public void sendMessages(String destinationName, int count) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("MessageID:" + i);
+ sender.send(message);
+ }
+ } finally {
+ connection.close();
+ }
+ }
}
[2/2] activemq-artemis git commit: This closes #837
Posted by ta...@apache.org.
This closes #837
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9743043f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9743043f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9743043f
Branch: refs/heads/master
Commit: 9743043fb81be47ba6f6cc2bdb029b46f772624f
Parents: 0a1e6bd 7a8b7e9
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 13 17:53:05 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 13 17:53:05 2016 -0400
----------------------------------------------------------------------
.../integration/amqp/AmqpClientTestSupport.java | 67 ++---
.../amqp/AmqpDeliveryAnnotationsTest.java | 64 +++++
.../amqp/AmqpDescribedTypePayloadTest.java | 151 ++++++++++
.../integration/amqp/AmqpReceiverDrainTest.java | 165 +++++++++++
.../amqp/AmqpScheduledMessageTest.java | 136 +++++++++
.../integration/amqp/AmqpSendReceiveTest.java | 279 ++++++++++++++++++-
6 files changed, 823 insertions(+), 39 deletions(-)
----------------------------------------------------------------------