You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/04/28 09:17:03 UTC
[6/7] activemq-artemis git commit: ARTEMIS-1123 Major AMQP Test Suite
refactoring
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
new file mode 100644
index 0000000..778cd40
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMaxFrameSizeTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.junit.Test;
+
+public class AmqpMaxFrameSizeTest extends AmqpClientTestSupport {
+
+ private static final int FRAME_SIZE = 512;
+
+ @Override
+ protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+ params.put("maxFrameSize", FRAME_SIZE);
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleTransfers() throws Exception {
+
+ String testQueueName = "ConnectionFrameSize";
+ int nMsgs = 200;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ connection.connect();
+
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(testQueueName);
+
+ final int payload = FRAME_SIZE * 16;
+
+ for (int i = 0; i < nMsgs; ++i) {
+ AmqpMessage message = createAmqpMessage((byte) 'A', payload);
+ sender.send(message);
+ }
+
+ int count = getMessageCount(server.getPostOffice(), testQueueName);
+ assertEquals(nMsgs, count);
+
+ AmqpReceiver receiver = session.createReceiver(testQueueName);
+ receiver.flow(nMsgs);
+
+ for (int i = 0; i < nMsgs; ++i) {
+ AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull("failed at " + i, message);
+ MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
+ Data data = (Data) wrapped.getBody();
+ System.out.println("received : message: " + data.getValue().getLength());
+ assertEquals(payload, data.getValue().getLength());
+ message.accept();
+ }
+
+ } finally {
+ connection.close();
+ }
+ }
+
+ private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
+ AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[payloadSize];
+ for (int i = 0; i < payload.length; i++) {
+ payload[i] = value;
+ }
+ message.setBytes(payload);
+ return message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
new file mode 100644
index 0000000..f895c86
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageDivertsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+public class AmqpMessageDivertsTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testQueueReceiverReadMessageWithDivert() throws Exception {
+ final String forwardingAddress = getQueueName() + "Divert";
+ final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
+ server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
+ server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
+
+ sendMessages(getQueueName(), 1);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(forwardingAddress);
+
+ Queue queueView = getProxyToQueue(forwardingAddress);
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
new file mode 100644
index 0000000..fcce0ab
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessageRoutingTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.Test;
+
+public class AmqpMessageRoutingTest extends JMSClientTestSupport {
+
+ @Override
+ protected boolean isAutoCreateQueues() {
+ return false;
+ }
+
+ @Override
+ protected boolean isAutoCreateAddresses() {
+ return false;
+ }
+
+ @Test(timeout = 60000)
+ public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+ sendMessages("anycast://" + addressA, 1);
+
+ assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+ sendMessages(addressA, 1, RoutingType.ANYCAST);
+
+ assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+ sendMessages("multicast://" + addressA, 1);
+
+ assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+ assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception {
+ final String addressA = "addressA";
+ final String queueA = "queueA";
+ final String queueB = "queueB";
+ final String queueC = "queueC";
+
+ ActiveMQServerControl serverControl = server.getActiveMQServerControl();
+ serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
+ serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
+ serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
+
+ sendMessages(addressA, 1, RoutingType.MULTICAST);
+
+ assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
+ assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
+ }
+
+ /**
+ * If we have an address configured with both ANYCAST and MULTICAST routing types enabled, we must ensure that any
+ * messages sent specifically to MULTICAST (e.g. JMS TopicProducer) are only delivered to MULTICAST queues (e.g.
+ * i.e. subscription queues) and **NOT** to ANYCAST queues (e.g. JMS Queue).
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testRoutingExclusivity() throws Exception {
+
+ // Create Address with both ANYCAST and MULTICAST enabled
+ String testAddress = "testRoutingExclusivity-mixed-mode";
+ SimpleString ssTestAddress = new SimpleString(testAddress);
+
+ AddressInfo addressInfo = new AddressInfo(ssTestAddress);
+ addressInfo.addRoutingType(RoutingType.MULTICAST);
+ addressInfo.addRoutingType(RoutingType.ANYCAST);
+
+ server.addAddressInfo(addressInfo);
+ server.createQueue(ssTestAddress, RoutingType.ANYCAST, ssTestAddress, null, true, false);
+
+ Connection connection = createConnection(UUIDGenerator.getInstance().generateStringUUID());
+
+ try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic topic = session.createTopic(testAddress);
+ javax.jms.Queue queue = session.createQueue(testAddress);
+
+ MessageProducer producer = session.createProducer(topic);
+
+ MessageConsumer queueConsumer = session.createConsumer(queue);
+ MessageConsumer topicConsumer = session.createConsumer(topic);
+
+ producer.send(session.createTextMessage("testMessage"));
+
+ assertNotNull(topicConsumer.receive(1000));
+ assertNull(queueConsumer.receive(1000));
+ } finally {
+ connection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
new file mode 100644
index 0000000..3d8be49
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpOutboundConnectionTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
+import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
+import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testOutboundConnection() throws Throwable {
+ final ActiveMQServer remote = createServer(AMQP_PORT + 1);
+ remote.start();
+ try {
+ Wait.waitFor(remote::isActive);
+ } catch (Exception e) {
+ remote.stop();
+ throw e;
+ }
+
+ final Map<String, Object> config = new LinkedHashMap<>();
+ config.put(TransportConstants.HOST_PROP_NAME, "localhost");
+ config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
+ ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty());
+ ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
+ NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
+ connector.start();
+ connector.createConnection();
+
+ try {
+ Wait.waitFor(() -> remote.getConnectionCount() > 0);
+ assertEquals(1, remote.getConnectionCount());
+
+ lifeCycleListener.stop();
+
+ Wait.waitFor(() -> remote.getConnectionCount() == 0);
+ assertEquals(0, remote.getConnectionCount());
+ } finally {
+ lifeCycleListener.stop();
+ remote.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
index 422e23e..3fd21b1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -192,7 +193,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
message.setText("Test-Message");
sender.send(message);
- assertEquals(1, queue.getMessageCount());
+ assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
@@ -228,7 +229,7 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
message.setText("Test-Message");
sender.send(message);
- assertEquals(1, queue.getMessageCount());
+ assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
AmqpReceiver receiver = session.createReceiver(getQueueName(), null, false, true);
@@ -250,21 +251,4 @@ public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
sender.close();
connection.close();
}
-
- public void sendMessages(String destinationName, int count) throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- try {
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
-
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("MessageID:" + i);
- sender.send(message);
- }
- } finally {
- connection.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java
new file mode 100644
index 0000000..e16fd46
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpProtocolHeaderHandlingTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.fusesource.hawtbuf.Buffer;
+import org.junit.Test;
+
+public class AmqpProtocolHeaderHandlingTest extends AmqpClientTestSupport {
+
+ @Override
+ protected boolean isSecurityEnabled() {
+ return true;
+ }
+
+ @Test(timeout = 60000)
+ public void testNonSaslHeaderRejectedOnConnect() throws Exception {
+ final AmqpHeader header = new AmqpHeader();
+
+ header.setProtocolId(0);
+ header.setMajor(1);
+ header.setMinor(0);
+ header.setRevision(0);
+
+ final ClientConnection connection = new ClientConnection();
+ connection.open("localhost", AMQP_PORT);
+ connection.send(header);
+
+ AmqpHeader response = connection.readAmqpHeader();
+ assertNotNull(response);
+ assertEquals(3, response.getProtocolId());
+ IntegrationTestLogger.LOGGER.info("Broker responded with: " + response);
+
+ // pump some bytes down the wire until broker closes the connection
+ assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisfied() throws Exception {
+ try {
+ connection.send(header);
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
+ }
+ }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
+ }
+
+ private class ClientConnection {
+
+ protected static final long RECEIVE_TIMEOUT = 10000;
+ protected Socket clientSocket;
+
+ public void open(String host, int port) throws IOException {
+ clientSocket = new Socket(host, port);
+ clientSocket.setTcpNoDelay(true);
+ }
+
+ public void send(AmqpHeader header) throws Exception {
+ IntegrationTestLogger.LOGGER.info("Client sending header: " + header);
+ OutputStream outputStream = clientSocket.getOutputStream();
+ header.getBuffer().writeTo(outputStream);
+ outputStream.flush();
+ }
+
+ public AmqpHeader readAmqpHeader() throws Exception {
+ clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT);
+ InputStream is = clientSocket.getInputStream();
+
+ byte[] header = new byte[8];
+ int read = is.read(header);
+ if (read == header.length) {
+ return new AmqpHeader(new Buffer(header));
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private class AmqpHeader {
+
+ final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'});
+
+ private Buffer buffer;
+
+ AmqpHeader() {
+ this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0}));
+ }
+
+ AmqpHeader(Buffer buffer) {
+ this(buffer, true);
+ }
+
+ AmqpHeader(Buffer buffer, boolean validate) {
+ setBuffer(buffer, validate);
+ }
+
+ public int getProtocolId() {
+ return buffer.get(4) & 0xFF;
+ }
+
+ public void setProtocolId(int value) {
+ buffer.data[buffer.offset + 4] = (byte) value;
+ }
+
+ public int getMajor() {
+ return buffer.get(5) & 0xFF;
+ }
+
+ public void setMajor(int value) {
+ buffer.data[buffer.offset + 5] = (byte) value;
+ }
+
+ public int getMinor() {
+ return buffer.get(6) & 0xFF;
+ }
+
+ public void setMinor(int value) {
+ buffer.data[buffer.offset + 6] = (byte) value;
+ }
+
+ public int getRevision() {
+ return buffer.get(7) & 0xFF;
+ }
+
+ public void setRevision(int value) {
+ buffer.data[buffer.offset + 7] = (byte) value;
+ }
+
+ public Buffer getBuffer() {
+ return buffer;
+ }
+
+ public void setBuffer(Buffer value) {
+ setBuffer(value, true);
+ }
+
+ public void setBuffer(Buffer value, boolean validate) {
+ if (validate && !value.startsWith(PREFIX) || value.length() != 8) {
+ throw new IllegalArgumentException("Not an AMQP header buffer");
+ }
+ buffer = value.buffer();
+ }
+
+ public boolean hasValidPrefix() {
+ return buffer.startsWith(PREFIX);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < buffer.length(); ++i) {
+ char value = (char) buffer.get(i);
+ if (Character.isLetter(value)) {
+ builder.append(value);
+ } else {
+ builder.append(",");
+ builder.append((int) value);
+ }
+ }
+ return builder.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index 9cd8f50..e636d83 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -22,7 +22,6 @@ import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
@@ -57,7 +56,6 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
receiver2.flow(1);
message.release();
-
// Read the message again and validate its state
message = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
@@ -172,21 +170,4 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
connection.close();
}
-
- public void sendMessages(String destinationName, int count) throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- try {
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
-
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("MessageID:" + i);
- sender.send(message);
- }
- } finally {
- connection.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
index 681ffbd..edf9459 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDrainTest.java
@@ -19,11 +19,11 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
@@ -33,46 +33,81 @@ import org.junit.Test;
public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
- public void testReceiverCanDrainMessages() throws Exception {
+ public void testReceiverCanDrainMessagesQueue() throws Exception {
+ doTestReceiverCanDrainMessages(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiverCanDrainMessagesTopic() throws Exception {
+ doTestReceiverCanDrainMessages(true);
+ }
+
+ private void doTestReceiverCanDrainMessages(boolean topic) throws Exception {
+ final String destinationName;
+ if (topic) {
+ destinationName = getTopicName();
+ } else {
+ destinationName = getQueueName();
+ }
+
int MSG_COUNT = 20;
- sendMessages(getQueueName(), MSG_COUNT);
AmqpClient client = createAmqpClient();
- AmqpConnection connection = client.connect();
+ AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getQueueName());
+ AmqpReceiver receiver = session.createReceiver(destinationName);
+
+ sendMessages(destinationName, MSG_COUNT);
+
+ Queue queueView = getProxyToQueue(destinationName);
- Queue queueView = getProxyToQueue(getQueueName());
assertEquals(MSG_COUNT, queueView.getMessageCount());
+ assertEquals(0, queueView.getDeliveringCount());
receiver.drain(MSG_COUNT);
for (int i = 0; i < MSG_COUNT; ++i) {
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(message);
+ assertNotNull("Failed to read message: " + (i + 1), message);
+ IntegrationTestLogger.LOGGER.info("Read message: " + message.getMessageId());
message.accept();
}
receiver.close();
- assertEquals(0, queueView.getMessageCount());
-
connection.close();
}
@Test(timeout = 60000)
- public void testPullWithNoMessageGetDrained() throws Exception {
+ public void testPullWithNoMessageGetDrainedQueue() throws Exception {
+ doTestPullWithNoMessageGetDrained(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testPullWithNoMessageGetDrainedTopic() throws Exception {
+ doTestPullWithNoMessageGetDrained(true);
+ }
+
+ private void doTestPullWithNoMessageGetDrained(boolean topic) throws Exception {
+
+ final String destinationName;
+ if (topic) {
+ destinationName = getTopicName();
+ } else {
+ destinationName = getQueueName();
+ }
AmqpClient client = createAmqpClient();
- AmqpConnection connection = client.connect();
+ AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getQueueName());
+ AmqpReceiver receiver = session.createReceiver(destinationName);
receiver.flow(10);
- Queue queueView = getProxyToQueue(getQueueName());
+ Queue queueView = getProxyToQueue(destinationName);
+
assertEquals(0, queueView.getMessageCount());
- assertEquals(0, queueView.getDeliveringCount());
+ assertEquals(0, queueView.getMessagesAcknowledged());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
@@ -84,18 +119,36 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
- public void testPullOneFromRemote() throws Exception {
- int MSG_COUNT = 20;
- sendMessages(getQueueName(), MSG_COUNT);
+ public void testPullOneFromRemoteQueue() throws Exception {
+ doTestPullOneFromRemote(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testPullOneFromRemoteTopic() throws Exception {
+ doTestPullOneFromRemote(true);
+ }
+
+ private void doTestPullOneFromRemote(boolean topic) throws Exception {
AmqpClient client = createAmqpClient();
- AmqpConnection connection = client.connect();
+ AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getQueueName());
+ final String destinationName;
+ if (topic) {
+ destinationName = getTopicName();
+ } else {
+ destinationName = getQueueName();
+ }
+
+ AmqpReceiver receiver = session.createReceiver(destinationName);
- Queue queueView = getProxyToQueue(getQueueName());
+ int MSG_COUNT = 20;
+ sendMessages(destinationName, MSG_COUNT);
+
+ Queue queueView = getProxyToQueue(destinationName);
assertEquals(MSG_COUNT, queueView.getMessageCount());
+ assertEquals(0, queueView.getDeliveringCount());
assertEquals(0, receiver.getReceiver().getRemoteCredit());
@@ -107,24 +160,39 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
receiver.close();
- assertEquals(MSG_COUNT - 1, queueView.getMessageCount());
- assertEquals(1, queueView.getMessagesAcknowledged());
-
connection.close();
}
@Test(timeout = 60000)
- public void testMultipleZeroResultPulls() throws Exception {
+ public void testMultipleZeroResultPullsQueue() throws Exception {
+ doTestMultipleZeroResultPulls(false);
+ }
+
+ @Test(timeout = 60000)
+ public void testMultipleZeroResultPullsTopic() throws Exception {
+ doTestMultipleZeroResultPulls(true);
+ }
+
+ private void doTestMultipleZeroResultPulls(boolean topic) throws Exception {
+
AmqpClient client = createAmqpClient();
- AmqpConnection connection = client.connect();
+ AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpReceiver receiver = session.createReceiver(getQueueName());
+ final String destinationName;
+ if (topic) {
+ destinationName = getTopicName();
+ } else {
+ destinationName = getQueueName();
+ }
+
+ AmqpReceiver receiver = session.createReceiver(destinationName);
receiver.flow(10);
- Queue queueView = getProxyToQueue(getQueueName());
+ Queue queueView = getProxyToQueue(destinationName);
assertEquals(0, queueView.getMessageCount());
+ assertEquals(0, queueView.getDeliveringCount());
assertEquals(10, receiver.getReceiver().getRemoteCredit());
@@ -139,27 +207,4 @@ public class AmqpReceiverDrainTest extends AmqpClientTestSupport {
connection.close();
}
-
- public void sendMessages(String destinationName, int count) throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = null;
-
- try {
- connection = client.connect();
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
-
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setText("Test-Message-" + i);
- sender.send(message);
- }
-
- sender.close();
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
index b47ad50..3aff030 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverTest.java
@@ -16,13 +16,38 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
+import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpUnknownFilterType;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Session;
import org.junit.Test;
/**
@@ -31,6 +56,119 @@ import org.junit.Test;
public class AmqpReceiverTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
+ public void testCreateQueueReceiver() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+ Queue queue = getProxyToQueue(getQueueName());
+ assertNotNull(queue);
+
+ receiver.close();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateTopicReceiver() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTopicName());
+
+ Queue queue = getProxyToQueue(getQueueName());
+ assertNotNull(queue);
+
+ receiver.close();
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ client.setValidator(new AmqpValidator() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void inspectOpenedResource(Receiver receiver) {
+
+ if (receiver.getRemoteSource() == null) {
+ markAsInvalid("Link opened with null source.");
+ }
+
+ Source source = (Source) receiver.getRemoteSource();
+ Map<Symbol, Object> filters = source.getFilter();
+
+ // Currently don't support noLocal on a Queue
+ if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
+ markAsInvalid("Broker did not return the NoLocal Filter on Attach");
+ }
+ }
+ });
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ session.createReceiver(getQueueName(), null, true);
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateQueueReceiverWithJMSSelector() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ client.setValidator(new AmqpValidator() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void inspectOpenedResource(Receiver receiver) {
+
+ if (receiver.getRemoteSource() == null) {
+ markAsInvalid("Link opened with null source.");
+ }
+
+ Source source = (Source) receiver.getRemoteSource();
+ Map<Symbol, Object> filters = source.getFilter();
+
+ if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
+ markAsInvalid("Broker did not return the JMS Filter on Attach");
+ }
+ }
+ });
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ session.createReceiver(getQueueName(), "JMSPriority > 8");
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testInvalidFilter() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ try {
+ session.createReceiver(getQueueName(), "null = 'f''", true);
+ fail("should throw exception");
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof JMSException);
+ }
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testSenderSettlementModeSettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
}
@@ -96,4 +234,164 @@ public class AmqpReceiverTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testClientIdIsSetInSubscriptionList() throws Exception {
+ server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ connection.setContainerId("testClient");
+ connection.connect();
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ Source source = new Source();
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setCapabilities(Symbol.getSymbol("topic"));
+ source.setAddress("mytopic");
+ session.createReceiver(source, "testSub");
+
+ SimpleString fo = new SimpleString("testClient.testSub:mytopic");
+ assertNotNull(server.locateQueue(fo));
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testLinkDetachSentWhenQueueDeleted() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+ server.destroyQueue(new SimpleString(getQueueName()), null, false, true);
+
+ assertTrue("Receiver should have closed", Wait.waitFor(receiver::isClosed));
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+ AddressSettings value = new AddressSettings();
+ value.setAutoCreateQueues(false);
+ value.setAutoCreateAddresses(false);
+ server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ try {
+ AmqpSession session = connection.createSession();
+
+ Exception expectedException = null;
+ try {
+ session.createSender("AnAddressThatDoesNotExist");
+ fail("Creating a sender here on an address that doesn't exist should fail");
+ } catch (Exception e) {
+ expectedException = e;
+ }
+
+ assertNotNull(expectedException);
+ assertTrue(expectedException.getMessage().contains("amqp:not-found"));
+ assertTrue(expectedException.getMessage().contains("target address does not exist"));
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testUnsupportedFiltersAreNotListedAsSupported() throws Exception {
+ AmqpClient client = createAmqpClient();
+
+ client.setValidator(new AmqpValidator() {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void inspectOpenedResource(Receiver receiver) {
+
+ if (receiver.getRemoteSource() == null) {
+ markAsInvalid("Link opened with null source.");
+ }
+
+ Source source = (Source) receiver.getRemoteSource();
+ Map<Symbol, Object> filters = source.getFilter();
+
+ if (findFilter(filters, AmqpUnknownFilterType.UNKNOWN_FILTER_IDS) != null) {
+ markAsInvalid("Broker should not return unsupported filter on attach.");
+ }
+ }
+ });
+
+ Map<Symbol, DescribedType> filters = new HashMap<>();
+ filters.put(AmqpUnknownFilterType.UNKNOWN_FILTER_NAME, AmqpUnknownFilterType.UNKNOWN_FILTER);
+
+ Source source = new Source();
+ source.setAddress(getQueueName());
+ source.setFilter(filters);
+ source.setDurable(TerminusDurability.NONE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ assertEquals(0, server.getTotalConsumerCount());
+
+ session.createReceiver(source);
+
+ assertEquals(1, server.getTotalConsumerCount());
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testReceiverCloseSendsRemoteClose() throws Exception {
+ AmqpClient client = createAmqpClient();
+ assertNotNull(client);
+
+ final AtomicBoolean closed = new AtomicBoolean();
+
+ client.setValidator(new AmqpValidator() {
+
+ @Override
+ public void inspectClosedResource(Session session) {
+ IntegrationTestLogger.LOGGER.info("Session closed: " + session.getContext());
+ }
+
+ @Override
+ public void inspectDetachedResource(Receiver receiver) {
+ markAsInvalid("Broker should not detach receiver linked to closed session.");
+ }
+
+ @Override
+ public void inspectClosedResource(Receiver receiver) {
+ IntegrationTestLogger.LOGGER.info("Receiver closed: " + receiver.getContext());
+ closed.set(true);
+ }
+ });
+
+ AmqpConnection connection = addConnection(client.connect());
+ assertNotNull(connection);
+ AmqpSession session = connection.createSession();
+ assertNotNull(session);
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ assertNotNull(receiver);
+
+ receiver.close();
+
+ assertTrue("Did not process remote close as expected", closed.get());
+ connection.getStateInspector().assertValid();
+
+ connection.close();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
index 748f10a..6459e76 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpScheduledMessageTest.java
@@ -104,7 +104,7 @@ public class AmqpScheduledMessageTest extends AmqpClientTestSupport {
}
}
- @Test
+ @Test(timeout = 60000)
public void testScheduleWithDelay() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
index 8e41d71..f99fc14 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSecurityTest.java
@@ -17,21 +17,9 @@
package org.apache.activemq.artemis.tests.integration.amqp;
import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.security.Role;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -39,44 +27,26 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
public class AmqpSecurityTest extends AmqpClientTestSupport {
- private String user1 = "user1";
- private String password1 = "password1";
-
@Override
- protected ActiveMQServer createServer() throws Exception {
- ActiveMQServer server = createServer(true, true);
- ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
- securityManager.getConfiguration().addUser("foo", "bar");
- securityManager.getConfiguration().addRole("foo", "none");
- securityManager.getConfiguration().addUser(user1, password1);
- securityManager.getConfiguration().addRole(user1, "none");
- HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository();
- HashSet<Role> value = new HashSet<>();
- value.add(new Role("none", false, true, true, true, true, true, true, true));
- securityRepository.addMatch(getQueueName(), value);
-
- serverManager = new JMSServerManagerImpl(server);
- Configuration serverConfig = server.getConfiguration();
- serverConfig.getAddressesSettings().put("jms.queue.#", new AddressSettings().setAutoCreateJmsQueues(true).setDeadLetterAddress(new SimpleString("jms.queue.ActiveMQ.DLQ")));
- serverConfig.setSecurityEnabled(true);
- serverManager.start();
- server.start();
- return server;
+ protected boolean isSecurityEnabled() {
+ return true;
}
@Test(timeout = 60000)
public void testSaslAuthWithInvalidCredentials() throws Exception {
AmqpConnection connection = null;
- AmqpClient client = createAmqpClient("foo", "foo");
+ AmqpClient client = createAmqpClient(fullUser, guestUser);
try {
connection = client.connect();
- fail("Should authenticate even with authzid set");
+ fail("Should not authenticate when invalid credentials provided");
} catch (Exception ex) {
+ // Expected
} finally {
if (connection != null) {
connection.close();
@@ -87,8 +57,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSaslAuthWithAuthzid() throws Exception {
AmqpConnection connection = null;
- AmqpClient client = createAmqpClient("foo", "bar");
- client.setAuthzid("foo");
+ AmqpClient client = createAmqpClient(guestUser, guestPass);
+ client.setAuthzid(guestUser);
try {
connection = client.connect();
@@ -104,7 +74,7 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSaslAuthWithoutAuthzid() throws Exception {
AmqpConnection connection = null;
- AmqpClient client = createAmqpClient("foo", "bar");
+ AmqpClient client = createAmqpClient(guestUser, guestPass);
try {
connection = client.connect();
@@ -119,20 +89,22 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendAndRejected() throws Exception {
- AmqpConnection connection = null;
- AmqpClient client = createAmqpClient("foo", "bar");
CountDownLatch latch = new CountDownLatch(1);
+
+ AmqpClient client = createAmqpClient(guestUser, guestPass);
client.setValidator(new AmqpValidator() {
+
@Override
- public void inspectDeliveryUpdate(Delivery delivery) {
- super.inspectDeliveryUpdate(delivery);
+ public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
if (!delivery.remotelySettled()) {
markAsInvalid("delivery is not remotely settled");
}
+
latch.countDown();
}
});
- connection = addConnection(client.connect());
+
+ AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
@@ -145,8 +117,8 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
try {
sender.send(message);
} catch (IOException e) {
- //
}
+
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
connection.getStateInspector().assertValid();
connection.close();
@@ -154,11 +126,9 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageFailsOnAnonymousRelayWhenNotAuthorizedToSendToAddress() throws Exception {
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST));
- server.createQueue(new SimpleString(getQueueName()), RoutingType.ANYCAST, new SimpleString(getQueueName()), null, true, false);
-
- AmqpClient client = createAmqpClient(user1, password1);
+ AmqpClient client = createAmqpClient(guestUser, guestPass);
AmqpConnection connection = client.connect();
+
try {
AmqpSession session = connection.createSession();
@@ -181,5 +151,4 @@ public class AmqpSecurityTest extends AmqpClientTestSupport {
connection.close();
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 9cf256a..0cae79f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -16,28 +16,23 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
-import static org.apache.activemq.transport.amqp.AmqpSupport.JMS_SELECTOR_FILTER_IDS;
-import static org.apache.activemq.transport.amqp.AmqpSupport.NO_LOCAL_FILTER_IDS;
import static org.apache.activemq.transport.amqp.AmqpSupport.contains;
-import static org.apache.activemq.transport.amqp.AmqpSupport.findFilter;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.JMSException;
+import javax.jms.Topic;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
-import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
@@ -48,8 +43,6 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.jgroups.util.UUID;
import org.junit.Test;
@@ -63,19 +56,14 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
- @Test(timeout = 60000)
- public void testCreateQueueReceiver() throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(getQueueName());
-
- Queue queue = getProxyToQueue(getQueueName());
- assertNotNull(queue);
+ @Override
+ protected boolean isAutoCreateQueues() {
+ return false;
+ }
- receiver.close();
- connection.close();
+ @Override
+ protected boolean isAutoCreateAddresses() {
+ return false;
}
@Test(timeout = 60000)
@@ -103,90 +91,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertEquals(0, queue.getMessageCount());
}
-
- @Test(timeout = 60000)
- public void testCreateQueueReceiverWithJMSSelector() throws Exception {
- AmqpClient client = createAmqpClient();
-
- client.setValidator(new AmqpValidator() {
-
- @SuppressWarnings("unchecked")
- @Override
- public void inspectOpenedResource(Receiver receiver) {
-
- if (receiver.getRemoteSource() == null) {
- markAsInvalid("Link opened with null source.");
- }
-
- Source source = (Source) receiver.getRemoteSource();
- Map<Symbol, Object> filters = source.getFilter();
-
- if (findFilter(filters, JMS_SELECTOR_FILTER_IDS) == null) {
- markAsInvalid("Broker did not return the JMS Filter on Attach");
- }
- }
- });
-
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- session.createReceiver(getQueueName(), "JMSPriority > 8");
-
- connection.getStateInspector().assertValid();
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testCreateQueueReceiverWithNoLocalSet() throws Exception {
- AmqpClient client = createAmqpClient();
-
- client.setValidator(new AmqpValidator() {
-
- @SuppressWarnings("unchecked")
- @Override
- public void inspectOpenedResource(Receiver receiver) {
-
- if (receiver.getRemoteSource() == null) {
- markAsInvalid("Link opened with null source.");
- }
-
- Source source = (Source) receiver.getRemoteSource();
- Map<Symbol, Object> filters = source.getFilter();
-
- // Currently don't support noLocal on a Queue
- if (findFilter(filters, NO_LOCAL_FILTER_IDS) != null) {
- markAsInvalid("Broker did not return the NoLocal Filter on Attach");
- }
- }
- });
-
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- session.createReceiver(getQueueName(), null, true);
-
- connection.getStateInspector().assertValid();
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testInvalidFilter() throws Exception {
- AmqpClient client = createAmqpClient();
-
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- try {
- session.createReceiver(getQueueName(), "null = 'f''", true);
- fail("should throw exception");
- } catch (Exception e) {
- assertTrue(e.getCause() instanceof JMSException);
- //passed
- }
-
- connection.close();
- }
-
@Test(timeout = 60000)
public void testQueueReceiverReadMessage() throws Exception {
sendMessages(getQueueName(), 1);
@@ -210,108 +114,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
- public void testQueueReceiverReadMessageWithDivert() throws Exception {
- final String forwardingAddress = getQueueName() + "Divert";
- final SimpleString simpleForwardingAddress = SimpleString.toSimpleString(forwardingAddress);
- server.createQueue(simpleForwardingAddress, RoutingType.ANYCAST, simpleForwardingAddress, null, true, false);
- server.getActiveMQServerControl().createDivert("name", "routingName", getQueueName(), forwardingAddress, true, null, null, DivertConfigurationRoutingType.ANYCAST.toString());
- sendMessages(getQueueName(), 1);
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(forwardingAddress);
-
- Queue queueView = getProxyToQueue(forwardingAddress);
- assertEquals(1, queueView.getMessageCount());
-
- receiver.flow(1);
- assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
- receiver.close();
-
- assertEquals(1, queueView.getMessageCount());
-
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testAnycastMessageRoutingExclusivityUsingPrefix() throws Exception {
- final String addressA = "addressA";
- final String queueA = "queueA";
- final String queueB = "queueB";
- final String queueC = "queueC";
-
- ActiveMQServerControl serverControl = server.getActiveMQServerControl();
- serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
- sendMessages("anycast://" + addressA, 1);
-
- assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
- assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
- }
-
- @Test(timeout = 60000)
- public void testAnycastMessageRoutingExclusivityUsingProperty() throws Exception {
- final String addressA = "addressA";
- final String queueA = "queueA";
- final String queueB = "queueB";
- final String queueC = "queueC";
-
- ActiveMQServerControl serverControl = server.getActiveMQServerControl();
- serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
- sendMessages(addressA, 1, RoutingType.ANYCAST);
-
- assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
- assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount());
- }
-
- @Test
- public void testMulticastMessageRoutingExclusivityUsingPrefix() throws Exception {
- final String addressA = "addressA";
- final String queueA = "queueA";
- final String queueB = "queueB";
- final String queueC = "queueC";
-
- ActiveMQServerControl serverControl = server.getActiveMQServerControl();
- serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
- sendMessages("multicast://" + addressA, 1);
-
- assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
- assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
- }
-
- @Test
- public void testMulticastMessageRoutingExclusivityUsingProperty() throws Exception {
- final String addressA = "addressA";
- final String queueA = "queueA";
- final String queueB = "queueB";
- final String queueC = "queueC";
-
- ActiveMQServerControl serverControl = server.getActiveMQServerControl();
- serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-
- sendMessages(addressA, 1, RoutingType.MULTICAST);
-
- assertEquals(0, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount());
- assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
- }
-
- @Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getQueueName(), 1, false);
@@ -870,7 +672,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message1.setMessageId("ID:Message:1");
sender.send(message1);
- assertEquals(1, queue.getMessageCount());
+ assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
receiver1.flow(1);
message1 = receiver1.receive(50, TimeUnit.SECONDS);
assertNotNull("Should have read a message", message1);
@@ -884,7 +686,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
message2.setMessageId("ID:Message:2");
sender.send(message2);
- assertEquals(1, queue.getMessageCount());
+ assertTrue("Message did not arrive", Wait.waitFor(() -> queue.getMessageCount() == 1));
receiver1.flow(1);
message2 = receiver1.receive(50, TimeUnit.SECONDS);
assertNotNull("Should have read a message", message2);
@@ -1018,7 +820,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
- @Test
+ @Test(timeout = 60000)
public void testDeliveryDelayOfferedWhenRequested() throws Exception {
AmqpClient client = createAmqpClient();
client.setValidator(new AmqpValidator() {
@@ -1036,7 +838,7 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender("queue://" + getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
+ AmqpSender sender = session.createSender(getQueueName(), new Symbol[] {AmqpSupport.DELAYED_DELIVERY});
assertNotNull(sender);
connection.getStateInspector().assertValid();
@@ -1100,45 +902,119 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
- public void sendMessages(String destinationName, int count) throws Exception {
- sendMessages(destinationName, count, null);
+ @Test(timeout = 60000)
+ public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ Exception expectedException = null;
+ try {
+ session.createSender("AnAddressThatDoesNotExist");
+ fail("Creating a sender here on an address that doesn't exist should fail");
+ } catch (Exception e) {
+ expectedException = e;
+ }
+
+ assertNotNull(expectedException);
+ assertTrue(expectedException.getMessage().contains("amqp:not-found"));
+ assertTrue(expectedException.getMessage().contains("target address does not exist"));
+
+ connection.close();
}
- public void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+ @Test(timeout = 60000)
+ public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
+ String queueName = "TestQueueName";
+ String address = "TestAddress";
+ server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
+ server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
+
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
+
try {
AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
+ AmqpSender sender = session.createSender(address);
+ AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(1);
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("MessageID:" + i);
- if (routingType != null) {
- message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
- }
- sender.send(message);
- }
+ AmqpMessage message = new AmqpMessage();
+ message.setText("TestPayload");
+ sender.send(message);
+
+ AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(receivedMessage);
} finally {
connection.close();
}
}
- public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+ @Test(timeout = 60000)
+ public void testSendReceiveLotsOfDurableMessagesOnQueue() throws Exception {
+ doTestSendReceiveLotsOfDurableMessages(Queue.class);
+ }
+
+ @Test(timeout = 60000)
+ public void testSendReceiveLotsOfDurableMessagesOnTopic() throws Exception {
+ doTestSendReceiveLotsOfDurableMessages(Topic.class);
+ }
+
+ private void doTestSendReceiveLotsOfDurableMessages(Class<?> destType) throws Exception {
+ final int MSG_COUNT = 1000;
+
AmqpClient client = createAmqpClient();
+
AmqpConnection connection = addConnection(client.connect());
- try {
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
+ AmqpSession session = connection.createSession();
+
+ final CountDownLatch done = new CountDownLatch(MSG_COUNT);
+ final AtomicBoolean error = new AtomicBoolean(false);
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ final String address;
+ if (Queue.class.equals(destType)) {
+ address = getQueueName();
+ } else {
+ address = getTopicName();
+ }
+
+ final AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(MSG_COUNT);
+
+ AmqpSender sender = session.createSender(address);
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("MessageID:" + i);
- message.setDurable(durable);
- sender.send(message);
+ Queue queueView = getProxyToQueue(address);
+
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ for (int i = 0; i < MSG_COUNT; i++) {
+ try {
+ AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+ received.accept();
+ done.countDown();
+ } catch (Exception ex) {
+ LOG.info("Caught error: {}", ex.getClass().getSimpleName());
+ error.set(true);
+ }
+ }
}
- } finally {
- connection.close();
+ });
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("msg" + i);
+ sender.send(message);
}
+
+ assertTrue("did not read all messages, waiting on: " + done.getCount(), done.await(10, TimeUnit.SECONDS));
+ assertFalse("should not be any errors on receive", error.get());
+ assertTrue("Should be no inflight messages.", Wait.waitFor(() -> queueView.getDeliveringCount() == 0));
+
+ sender.close();
+ receiver.close();
+ connection.close();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
index 7b8cbef..8c95064 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -16,14 +16,22 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
import org.junit.Test;
/**
@@ -101,4 +109,74 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testUnsettledSender() throws Exception {
+ final int MSG_COUNT = 1000;
+
+ final CountDownLatch settled = new CountDownLatch(MSG_COUNT);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+
+ connection.setStateInspector(new AmqpValidator() {
+
+ @Override
+ public void inspectDeliveryUpdate(Sender sender, Delivery delivery) {
+ if (delivery.remotelySettled()) {
+ IntegrationTestLogger.LOGGER.trace("Remote settled message for sender: " + sender.getName());
+ settled.countDown();
+ }
+ }
+ });
+
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getQueueName(), false);
+
+ for (int i = 1; i <= MSG_COUNT; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message: " + i);
+ sender.send(message);
+
+ if (i % 1000 == 0) {
+ IntegrationTestLogger.LOGGER.info("Sent message: " + i);
+ }
+ }
+
+ Queue queueView = getProxyToQueue(getQueueName());
+ assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+ sender.close();
+
+ assertTrue("Remote should have settled all deliveries", settled.await(5, TimeUnit.MINUTES));
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testPresettledSender() throws Exception {
+ final int MSG_COUNT = 1000;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(getQueueName(), true);
+
+ for (int i = 1; i <= MSG_COUNT; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message: " + i);
+ sender.send(message);
+
+ if (i % 1000 == 0) {
+ IntegrationTestLogger.LOGGER.info("Sent message: " + i);
+ }
+ }
+
+ Queue queueView = getProxyToQueue(getQueueName());
+ assertTrue("All messages should arrive", Wait.waitFor(() -> queueView.getMessageCount() == MSG_COUNT));
+
+ sender.close();
+ connection.close();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
new file mode 100644
index 0000000..0048be5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.transport.amqp.client.AmqpValidator;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Session;
+import org.junit.Test;
+
+public class AmqpSessionTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testCreateSession() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+ assertNotNull(session);
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSessionClosedDoesNotGetReceiverDetachFromRemote() throws Exception {
+ AmqpClient client = createAmqpClient();
+ assertNotNull(client);
+
+ client.setValidator(new AmqpValidator() {
+
+ @Override
+ public void inspectClosedResource(Session session) {
+ IntegrationTestLogger.LOGGER.info("Session closed: " + session.getContext());
+ }
+
+ @Override
+ public void inspectDetachedResource(Receiver receiver) {
+ markAsInvalid("Broker should not detach receiver linked to closed session.");
+ }
+
+ @Override
+ public void inspectClosedResource(Receiver receiver) {
+ markAsInvalid("Broker should not close receiver linked to closed session.");
+ }
+ });
+
+ AmqpConnection connection = addConnection(client.connect());
+ assertNotNull(connection);
+ AmqpSession session = connection.createSession();
+ assertNotNull(session);
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ assertNotNull(receiver);
+
+ session.close();
+
+ connection.getStateInspector().assertValid();
+ connection.close();
+ }
+}