You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/04/28 09:17:00 UTC
[3/7] activemq-artemis git commit: ARTEMIS-1123 Major AMQP Test Suite
refactoring
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
deleted file mode 100644
index 7933fec..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ /dev/null
@@ -1,1788 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.InvalidClientIDException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.QueueBrowser;
-import javax.jms.ResourceAllocationException;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-
-import org.apache.activemq.artemis.api.core.RoutingType;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.AddressControl;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.impl.AddressInfo;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
-import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
-import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
-import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
-import org.apache.activemq.artemis.tests.util.Wait;
-import org.apache.activemq.artemis.utils.Base64;
-import org.apache.activemq.artemis.utils.ByteUtil;
-import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.activemq.artemis.utils.TimeUtils;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
-import org.apache.activemq.transport.amqp.client.AmqpClient;
-import org.apache.activemq.transport.amqp.client.AmqpConnection;
-import org.apache.activemq.transport.amqp.client.AmqpMessage;
-import org.apache.activemq.transport.amqp.client.AmqpReceiver;
-import org.apache.activemq.transport.amqp.client.AmqpSender;
-import org.apache.activemq.transport.amqp.client.AmqpSession;
-import org.apache.qpid.jms.JmsConnectionFactory;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.messaging.Source;
-import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ProtonTest extends ProtonTestBase {
-
- private static final String amqpConnectionUri = "amqp://localhost:5672";
-
- private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
-
- private static final long maxSizeBytes = 1 * 1024 * 1024;
-
- private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
-
- private MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer();
-
- private int messagesSent = 0;
-
- // this will ensure that all tests in this class are run twice,
- // once with "true" passed to the class' constructor and once with "false"
- @Parameterized.Parameters(name = "{0}")
- public static Collection getParameters() {
-
- // these 3 are for comparison
- return Arrays.asList(new Object[][]{{"AMQP", 0}, {"AMQP_ANONYMOUS", 3}});
- }
-
-
- ConnectionFactory factory;
-
- private final int protocol;
-
- public ProtonTest(String name, int protocol) {
- this.coreAddress = "exampleQueue";
- this.protocol = protocol;
- if (protocol == 0 || protocol == 3) {
- this.address = coreAddress;
- } else {
- this.address = "exampleQueue";
- }
- }
-
- private final String coreAddress;
- private final String address;
- private Connection connection;
-
-
- @Override
- protected ActiveMQServer createAMQPServer(int port) throws Exception {
- ActiveMQServer server = super.createAMQPServer(port);
- server.getConfiguration().addAcceptorConfiguration("flow", "tcp://localhost:" + (8 + port) + "?protocols=AMQP;useEpoll=false;amqpCredits=1;amqpMinCredits=1");
- server.setMBeanServer(mBeanServer);
- server.getConfiguration().setJMXManagementEnabled(true);
- return server;
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
-
- Configuration serverConfig = server.getConfiguration();
- Map<String, AddressSettings> settings = serverConfig.getAddressesSettings();
- assertNotNull(settings);
- AddressSettings addressSetting = settings.get("#");
- if (addressSetting == null) {
- addressSetting = new AddressSettings();
- settings.put("#", addressSetting);
- }
- addressSetting.setAutoCreateQueues(false);
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "1"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "2"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "3"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "4"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "5"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "6"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "7"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "8"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "9"), RoutingType.ANYCAST));
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(coreAddress + "10"), RoutingType.ANYCAST));
- server.createQueue(new SimpleString(coreAddress), RoutingType.ANYCAST, new SimpleString(coreAddress), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "1"), RoutingType.ANYCAST, new SimpleString(coreAddress + "1"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "2"), RoutingType.ANYCAST, new SimpleString(coreAddress + "2"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "3"), RoutingType.ANYCAST, new SimpleString(coreAddress + "3"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "4"), RoutingType.ANYCAST, new SimpleString(coreAddress + "4"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "5"), RoutingType.ANYCAST, new SimpleString(coreAddress + "5"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "6"), RoutingType.ANYCAST, new SimpleString(coreAddress + "6"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "7"), RoutingType.ANYCAST, new SimpleString(coreAddress + "7"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "8"), RoutingType.ANYCAST, new SimpleString(coreAddress + "8"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "9"), RoutingType.ANYCAST, new SimpleString(coreAddress + "9"), null, true, false);
- server.createQueue(new SimpleString(coreAddress + "10"), RoutingType.ANYCAST, new SimpleString(coreAddress + "10"), null, true, false);
- server.addAddressInfo(new AddressInfo(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST));
- server.createQueue(new SimpleString("amqp_testtopic"), RoutingType.MULTICAST, new SimpleString("amqp_testtopic"), null, true, false);
- /* server.createQueue(new SimpleString("amqp_testtopic" + "1"), new SimpleString("amqp_testtopic" + "1"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "2"), new SimpleString("amqp_testtopic" + "2"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "3"), new SimpleString("amqp_testtopic" + "3"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "4"), new SimpleString("amqp_testtopic" + "4"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "5"), new SimpleString("amqp_testtopic" + "5"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "6"), new SimpleString("amqp_testtopic" + "6"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "7"), new SimpleString("amqp_testtopic" + "7"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "8"), new SimpleString("amqp_testtopic" + "8"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "9"), new SimpleString("amqp_testtopic" + "9"), null, true, false);
- server.createQueue(new SimpleString("amqp_testtopic" + "10"), new SimpleString("amqp_testtopic" + "10"), null, true, false);*/
-
- connection = createConnection();
-
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- try {
- Thread.sleep(250);
- if (connection != null) {
- connection.close();
- }
- } finally {
- super.tearDown();
- }
- }
-
- @Test
- public void testSendAndReceiveOnTopic() throws Exception {
- Connection connection = createConnection("myClientId");
- try {
- TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("amqp_testtopic");
- TopicSubscriber consumer = session.createSubscriber(topic);
- TopicPublisher producer = session.createPublisher(topic);
-
- TextMessage message = session.createTextMessage("test-message");
- producer.send(message);
-
- producer.close();
-
- connection.start();
-
- message = (TextMessage) consumer.receive(1000);
- assertNotNull(message);
- assertNotNull(message.getText());
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- @Test
- public void testAddressControlSendMessage() throws Exception {
- SimpleString address = RandomUtil.randomSimpleString();
- server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
-
- AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
- Assert.assertEquals(1, addressControl.getQueueNames().length);
- addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.BYTES_TYPE, Base64.encodeBytes("test".getBytes()), false, userName, password);
-
- Wait.waitFor(() -> addressControl.getMessageCount() == 1);
-
- Assert.assertEquals(1, addressControl.getMessageCount());
-
- Connection connection = createConnection("myClientId");
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = session.createQueue(address.toString());
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(500);
- assertNotNull(message);
- byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
- ((BytesMessage)message).readBytes(buffer);
- assertEquals("test", new String(buffer));
- session.close();
- connection.close();
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- @Test
- public void testAddressControlSendMessageWithText() throws Exception {
- SimpleString address = RandomUtil.randomSimpleString();
- server.createQueue(address, RoutingType.ANYCAST, address, null, true, false);
-
- AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mBeanServer);
- Assert.assertEquals(1, addressControl.getQueueNames().length);
- addressControl.sendMessage(null, org.apache.activemq.artemis.api.core.Message.TEXT_TYPE, "test", false, userName, password);
-
- Wait.waitFor(() -> addressControl.getMessageCount() == 1);
-
- Assert.assertEquals(1, addressControl.getMessageCount());
-
- Connection connection = createConnection("myClientId");
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = session.createQueue(address.toString());
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(500);
- assertNotNull(message);
- String text = ((TextMessage) message).getText();
- assertEquals("test", text);
- session.close();
- connection.close();
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- @Test
- public void testDurableSubscriptionUnsubscribe() throws Exception {
- Connection connection = createConnection("myClientId");
- try {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("amqp_testtopic");
- TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub");
- session.close();
- connection.close();
- connection = createConnection("myClientId");
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- myDurSub = session.createDurableSubscriber(topic, "myDurSub");
- myDurSub.close();
- Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
- session.unsubscribe("myDurSub");
- Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId.myDurSub")));
- session.close();
- connection.close();
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- @Test
- public void testTemporarySubscriptionDeleted() throws Exception {
- try {
- TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Topic topic = session.createTopic("amqp_testtopic");
- TopicSubscriber myDurSub = session.createSubscriber(topic);
- Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
- Assert.assertEquals(2, bindingsForAddress.getBindings().size());
- session.close();
- final CountDownLatch latch = new CountDownLatch(1);
- server.getRemotingService().getConnections().iterator().next().addCloseListener(new CloseListener() {
- @Override
- public void connectionClosed() {
- latch.countDown();
- }
- });
- connection.close();
- latch.await(5, TimeUnit.SECONDS);
- bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
- Assert.assertEquals(1, bindingsForAddress.getBindings().size());
- } finally {
- if (connection != null) {
- connection.close();
- }
- }
- }
-
- @Test
- public void testCreditsAreAllocatedOnlyOnceOnLinkCreate() throws Exception {
-
- String destinationAddress = address + 1;
- AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(destinationAddress);
- assertTrue(sender.getSender().getCredit() == 1);
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testTemporaryQueue() throws Throwable {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
- System.out.println("queue:" + queue.getQueueName());
- MessageProducer p = session.createProducer(queue);
-
- TextMessage message = session.createTextMessage();
- message.setText("Message temporary");
- p.send(message);
-
- MessageConsumer cons = session.createConsumer(queue);
- connection.start();
-
- message = (TextMessage) cons.receive(5000);
- Assert.assertNotNull(message);
- }
-
- @Test
- public void testCommitProducer() throws Throwable {
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- javax.jms.Queue queue = createQueue(address);
- System.out.println("queue:" + queue.getQueueName());
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < 10; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("Message:" + i);
- p.send(message);
- }
- session.commit();
- session.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
- //because tx commit is executed async on broker, we use a timed wait.
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
- }
-
- @Test
- public void testRollbackProducer() throws Throwable {
-
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- javax.jms.Queue queue = createQueue(address);
- System.out.println("queue:" + queue.getQueueName());
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < 10; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("Message:" + i);
- p.send(message);
- }
- session.rollback();
- session.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
- Assert.assertEquals(q.getMessageCount(), 0);
- }
-
- @Test
- public void testCommitConsumer() throws Throwable {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = createQueue(address);
- System.out.println("queue:" + queue.getQueueName());
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < 10; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("Message:" + i);
- p.send(message);
- }
- session.close();
-
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer cons = session.createConsumer(queue);
- connection.start();
-
- for (int i = 0; i < 10; i++) {
- TextMessage message = (TextMessage) cons.receive(5000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Message:" + i, message.getText());
- }
- session.commit();
- session.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
- Assert.assertEquals(q.getMessageCount(), 0);
- }
-
- @Test
- public void testRollbackConsumer() throws Throwable {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = createQueue(address);
- System.out.println("queue:" + queue.getQueueName());
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < 10; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("Message:" + i);
- p.send(message);
- }
- session.close();
-
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer cons = session.createConsumer(queue);
- connection.start();
-
- for (int i = 0; i < 10; i++) {
- TextMessage message = (TextMessage) cons.receive(5000);
- Assert.assertNotNull(message);
- Assert.assertEquals("Message:" + i, message.getText());
- }
- session.rollback();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
- //because tx rollback is executed async on broker, we use a timed wait.
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
-
- }
-
- @Test
- public void testObjectMessage() throws Throwable {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = createQueue(address);
- MessageProducer p = session.createProducer(queue);
- ArrayList<String> list = new ArrayList<>();
- list.add("aString");
- ObjectMessage objectMessage = session.createObjectMessage(list);
- p.send(objectMessage);
- session.close();
-
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = session.createConsumer(queue);
- connection.start();
-
- objectMessage = (ObjectMessage) cons.receive(5000);
- assertNotNull(objectMessage);
- list = (ArrayList<String>) objectMessage.getObject();
- assertEquals(list.get(0), "aString");
- connection.close();
- }
-
- @Test
- public void testResourceLimitExceptionOnAddressFull() throws Exception {
- setAddressFullBlockPolicy();
- String destinationAddress = address + 1;
- fillAddress(destinationAddress);
-
- long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
- assertTrue(addressSize >= maxSizeBytesRejectThreshold);
- }
-
- @Test
- public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
- setAddressFullBlockPolicy();
-
- String destinationAddress = address + 1;
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Destination d = session.createQueue(destinationAddress);
- MessageProducer p = session.createProducer(d);
-
- fillAddress(destinationAddress);
-
- Exception e = null;
- try {
- p.send(session.createBytesMessage());
- } catch (ResourceAllocationException rae) {
- e = rae;
- }
- assertTrue(e instanceof ResourceAllocationException);
- assertTrue(e.getMessage().contains("resource-limit-exceeded"));
-
- long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
- assertTrue(addressSize >= maxSizeBytesRejectThreshold);
- }
-
- @Test(timeout = 10000)
- public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
- setAddressFullBlockPolicy();
-
-
- String destinationAddress = address + 1;
- AmqpClient client = new AmqpClient(new URI("tcp://localhost:5680"), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(destinationAddress);
-
- // Use blocking send to ensure buffered messages do not interfere with credit.
- sender.setSendTimeout(-1);
- sendUntilFull(sender);
-
- // This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
- assertTrue(sender.getSender().getCredit() == -1);
-
- long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
- assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
- setAddressFullBlockPolicy();
-
- String destinationAddress = address + 1;
- fillAddress(destinationAddress);
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(destinationAddress);
-
- // Wait for a potential flow frame.
- Thread.sleep(500);
- assertEquals(0, sender.getSender().getCredit());
-
- // Empty Address except for 1 message used later.
- AmqpReceiver receiver = session.createReceiver(destinationAddress);
- receiver.flow(100);
-
- AmqpMessage m;
- for (int i = 0; i < messagesSent - 1; i++) {
- m = receiver.receive(5000, TimeUnit.MILLISECONDS);
- m.accept();
- }
-
- // Wait for address to unblock and flow frame to arrive
- Thread.sleep(500);
-
- assertTrue(sender.getSender().getCredit() >= 0);
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
- setAddressFullBlockPolicy();
-
- fillAddress(address + 1);
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- try {
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(address + 1);
- // Wait for a potential flow frame.
- Thread.sleep(1000);
- assertEquals(0, sender.getSender().getCredit());
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
- setAddressFullBlockPolicy();
-
- // Create the link attach before filling the address to ensure the link is allocated credit.
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
-
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(address);
- sender.setPresettle(true);
-
- fillAddress(address);
-
- final AmqpMessage message = new AmqpMessage();
- byte[] payload = new byte[50 * 1024];
- message.setBytes(payload);
-
- Exception expectedException = null;
- try {
- session.begin();
- sender.send(message);
- session.commit();
- } catch (Exception e) {
- expectedException = e;
- } finally {
- amqpConnection.close();
- }
-
- assertNotNull(expectedException);
- assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
- assertTrue(expectedException.getMessage().contains("Address is full: " + address));
- }
-
- /**
- * Fills an address. Careful when using this method. Only use when rejected messages are switched on.
- *
- * @param address
- * @return
- * @throws Exception
- */
- private void fillAddress(String address) throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- Exception exception = null;
- try {
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(address);
- sendUntilFull(sender);
- } catch (Exception e) {
- exception = e;
- } finally {
- amqpConnection.close();
- }
-
- // Should receive a rejected error
- assertNotNull(exception);
- assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
- }
-
- private void sendUntilFull(final AmqpSender sender) throws Exception {
- final AmqpMessage message = new AmqpMessage();
- byte[] payload = new byte[50 * 1024];
- message.setBytes(payload);
-
- final int maxMessages = 50;
- final AtomicInteger sentMessages = new AtomicInteger(0);
- final Exception[] errors = new Exception[1];
- final CountDownLatch timeout = new CountDownLatch(1);
-
- Runnable sendMessages = new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = 0; i < maxMessages; i++) {
- sender.send(message);
- System.out.println("Sent " + i);
- sentMessages.getAndIncrement();
- }
- timeout.countDown();
- } catch (IOException e) {
- errors[0] = e;
- }
- }
- };
-
- Thread t = new Thread(sendMessages);
-
- try {
- t.start();
-
- timeout.await(1, TimeUnit.SECONDS);
-
- messagesSent = sentMessages.get();
- if (errors[0] != null) {
- throw errors[0];
- }
- } finally {
- t.interrupt();
- t.join(1000);
- Assert.assertFalse(t.isAlive());
- }
- }
-
- @Test
- public void testLinkDetatchErrorIsCorrectWhenQueueDoesNotExists() throws Exception {
- AddressSettings value = new AddressSettings();
- value.setAutoCreateJmsQueues(false);
- value.setAutoCreateQueues(false);
- value.setAutoCreateAddresses(false);
- value.setAutoCreateJmsTopics(false);
- server.getAddressSettingsRepository().addMatch("AnAddressThatDoesNotExist", value);
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- AmqpSession session = amqpConnection.createSession();
-
- Exception expectedException = null;
- try {
- session.createSender("AnAddressThatDoesNotExist");
- fail("Creating a sender here on an address that doesn't exist should fail");
- } catch (Exception e) {
- expectedException = e;
- }
-
- assertNotNull(expectedException);
- assertTrue(expectedException.getMessage().contains("amqp:not-found"));
- assertTrue(expectedException.getMessage().contains("target address does not exist"));
- amqpConnection.close();
- }
-
- @Test
- public void testLinkDetachSentWhenQueueDeleted() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- final AmqpConnection amqpConnection = client.connect();
- try {
- AmqpSession session = amqpConnection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(coreAddress);
- server.destroyQueue(new SimpleString(coreAddress), null, false, true);
-
- Wait.waitFor(receiver::isClosed);
- assertTrue(receiver.isClosed());
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testCloseIsSentOnConnectionClose() throws Exception {
- connection.close();
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- final AmqpConnection amqpConnection = client.connect();
- try {
- for (RemotingConnection connection : server.getRemotingService().getConnections()) {
- server.getRemotingService().removeConnection(connection);
- connection.disconnect(true);
- }
-
- Wait.waitFor(amqpConnection::isClosed);
-
- assertTrue(amqpConnection.isClosed());
- assertEquals(AmqpSupport.CONNECTION_FORCED, amqpConnection.getConnection().getRemoteCondition().getCondition());
- } finally {
- amqpConnection.close();
- }
- }
-
-
- @Test
- public void testClientIdIsSetInSubscriptionList() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString("mytopic"), RoutingType.ANYCAST));
- AmqpConnection amqpConnection = client.createConnection();
- amqpConnection.setContainerId("testClient");
- amqpConnection.setOfferedCapabilities(Arrays.asList(Symbol.getSymbol("topic")));
- amqpConnection.connect();
- try {
- AmqpSession session = amqpConnection.createSession();
-
- Source source = new Source();
- source.setDurable(TerminusDurability.UNSETTLED_STATE);
- source.setCapabilities(Symbol.getSymbol("topic"));
- source.setAddress("mytopic");
- AmqpReceiver receiver = session.createReceiver(source, "testSub");
-
- SimpleString fo = new SimpleString("testClient.testSub:mytopic");
- assertNotNull(server.locateQueue(fo));
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- amqpConnection.close();
- }
- }
-
- @Test
- public void testSendingAndReceivingToQueueWithDifferentAddressAndQueueName() throws Exception {
-
- String queueName = "TestQueueName";
- String address = "TestAddress";
- server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(address), RoutingType.ANYCAST));
- server.createQueue(new SimpleString(address), RoutingType.ANYCAST, new SimpleString(queueName), null, true, false);
-
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = client.connect();
- AmqpSession session = amqpConnection.createSession();
- AmqpSender sender = session.createSender(address);
- AmqpReceiver receiver = session.createReceiver(address);
- receiver.flow(1);
-
- AmqpMessage message = new AmqpMessage();
- message.setText("TestPayload");
- sender.send(message);
-
- AmqpMessage receivedMessage = receiver.receive(5000, TimeUnit.MILLISECONDS);
- assertNotNull(receivedMessage);
- amqpConnection.close();
- }
-
- @Test
- public void testReplyTo() throws Throwable {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
- MessageProducer p = session.createProducer(queue);
-
- TextMessage message = session.createTextMessage();
- message.setText("Message temporary");
- message.setJMSReplyTo(createQueue(address));
- p.send(message);
-
- MessageConsumer cons = session.createConsumer(queue);
- connection.start();
-
- message = (TextMessage) cons.receive(5000);
- assertNotNull(message);
- Destination jmsReplyTo = message.getJMSReplyTo();
- Assert.assertNotNull(jmsReplyTo);
- Assert.assertNotNull(message);
- }
-
- @Test
- public void testReplyToNonJMS() throws Throwable {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- TemporaryQueue queue = session.createTemporaryQueue();
- System.out.println("queue:" + queue.getQueueName());
- MessageProducer p = session.createProducer(queue);
-
- TextMessage message = session.createTextMessage();
- message.setText("Message temporary");
- message.setJMSReplyTo(createQueue("someaddress"));
- p.send(message);
-
- MessageConsumer cons = session.createConsumer(queue);
- connection.start();
-
- message = (TextMessage) cons.receive(5000);
- Destination jmsReplyTo = message.getJMSReplyTo();
- Assert.assertNotNull(jmsReplyTo);
- Assert.assertNotNull(message);
-
- }
-
- @Test
- public void testOutboundConnection() throws Throwable {
- final ActiveMQServer remote = createAMQPServer(5673);
- remote.start();
- try {
- Wait.waitFor(remote::isActive);
- } catch (Exception e) {
- remote.stop();
- throw e;
- }
-
- final Map<String, Object> config = new LinkedHashMap<>();
- config.put(TransportConstants.HOST_PROP_NAME, "localhost");
- config.put(TransportConstants.PORT_PROP_NAME, "5673");
- ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty());
- ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
- NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
- connector.start();
- connector.createConnection();
-
- try {
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisfied() throws Exception {
- return remote.getConnectionCount() > 0;
- }
- });
- assertEquals(1, remote.getConnectionCount());
-
- lifeCycleListener.stop();
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisfied() throws Exception {
- return remote.getConnectionCount() == 0;
- }
- });
- assertEquals(0, remote.getConnectionCount());
- } finally {
- lifeCycleListener.stop();
- remote.stop();
- }
- }
-
- /*
- // Uncomment testLoopBrowser to validate the hunging on the test
- @Test
- public void testLoopBrowser() throws Throwable {
- for (int i = 0 ; i < 1000; i++) {
- System.out.println("#test " + i);
- testBrowser();
- tearDown();
- setUp();
- }
- } */
-
- /**
- * This test eventually fails because of: https://issues.apache.org/jira/browse/QPID-4901
- *
- * @throws Throwable
- */
- //@Test // TODO: re-enable this when we can get a version free of QPID-4901 bug
- public void testBrowser() throws Throwable {
-
- boolean success = false;
-
- for (int i = 0; i < 10; i++) {
- // As this test was hunging, we added a protection here to fail it instead.
- // it seems something on the qpid client, so this failure belongs to them and we can ignore it on
- // our side (ActiveMQ)
- success = runWithTimeout(new RunnerWithEX() {
- @Override
- public void run() throws Throwable {
- int numMessages = 50;
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("msg:" + i);
- p.send(message);
- }
-
- connection.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
-
- connection = createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- QueueBrowser browser = session.createBrowser(queue);
- Enumeration enumeration = browser.getEnumeration();
- int count = 0;
- while (enumeration.hasMoreElements()) {
- Message msg = (Message) enumeration.nextElement();
- Assert.assertNotNull("" + count, msg);
- Assert.assertTrue("" + msg, msg instanceof TextMessage);
- String text = ((TextMessage) msg).getText();
- Assert.assertEquals(text, "msg:" + count++);
- }
- Assert.assertEquals(count, numMessages);
- connection.close();
- Assert.assertEquals(getMessageCount(q), numMessages);
- }
- }, 5000);
-
- if (success) {
- break;
- } else {
- System.err.println("Had to make it fail!!!");
- tearDown();
- setUp();
- }
- }
-
- // There is a bug on the qpid client library currently, we can expect having to interrupt the thread on browsers.
- // but we can't have it on 10 iterations... something must be broken if that's the case
- Assert.assertTrue("Test had to interrupt on all occasions.. this is beyond the expected for the test", success);
- }
-
- @Test
- public void testConnection() throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = session.createConsumer(createQueue(address));
-
- org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(coreAddress));
-
- assertEquals(1, serverQueue.getConsumerCount());
-
- cons.close();
-
- for (int i = 0; i < 100 && serverQueue.getConsumerCount() != 0; i++) {
- Thread.sleep(500);
- }
-
- assertEquals(0, serverQueue.getConsumerCount());
-
- session.close();
-
- }
-
- @Test
- public void testMessagesSentTransactional() throws Exception {
- int numMessages = 1000;
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer p = session.createProducer(queue);
- byte[] bytes = new byte[2048];
- new Random().nextBytes(bytes);
- for (int i = 0; i < numMessages; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("msg:" + i);
- p.send(message);
- }
- session.commit();
- connection.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
- for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) {
- Thread.sleep(1);
- }
- Assert.assertEquals(numMessages, getMessageCount(q));
- }
-
- @Test
- public void testMessagesSentTransactionalRolledBack() throws Exception {
- int numMessages = 1;
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageProducer p = session.createProducer(queue);
- byte[] bytes = new byte[2048];
- new Random().nextBytes(bytes);
- for (int i = 0; i < numMessages; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("msg:" + i);
- p.send(message);
- }
- session.close();
- connection.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
- Assert.assertEquals(getMessageCount(q), 0);
- }
-
- @Test
- public void testCancelMessages() throws Exception {
- int numMessages = 10;
- long time = System.currentTimeMillis();
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- byte[] bytes = new byte[2048];
- new Random().nextBytes(bytes);
- for (int i = 0; i < numMessages; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("msg:" + i);
- p.send(message);
- }
- connection.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
-
- for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) {
- Thread.sleep(1);
- }
-
- Assert.assertEquals(numMessages, getMessageCount(q));
- //now create a new connection and receive
- connection = createConnection();
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
- Thread.sleep(100);
- consumer.close();
- connection.close();
- Assert.assertEquals(numMessages, getMessageCount(q));
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testClientAckMessages() throws Exception {
- int numMessages = 10;
- long time = System.currentTimeMillis();
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- byte[] bytes = new byte[2048];
- new Random().nextBytes(bytes);
- for (int i = 0; i < numMessages; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("msg:" + i);
- p.send(message);
- }
- connection.close();
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
-
- for (long timeout = System.currentTimeMillis() + 5000; timeout > System.currentTimeMillis() && getMessageCount(q) != numMessages; ) {
- Thread.sleep(1);
- }
- Assert.assertEquals(numMessages, getMessageCount(q));
- //now create a new connection and receive
- connection = createConnection();
- session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
- for (int i = 0; i < numMessages; i++) {
- Message msg = consumer.receive(5000);
- if (msg == null) {
- System.out.println("ProtonTest.testManyMessages");
- }
- Assert.assertNotNull("" + i, msg);
- Assert.assertTrue("" + msg, msg instanceof TextMessage);
- String text = ((TextMessage) msg).getText();
- //System.out.println("text = " + text);
- Assert.assertEquals(text, "msg:" + i);
- msg.acknowledge();
- }
-
- consumer.close();
- connection.close();
-
- // Wait for Acks to be processed and message removed from queue.
- Thread.sleep(500);
-
- Assert.assertEquals(0, getMessageCount(q));
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testMessagesReceivedInParallel() throws Throwable {
- final int numMessages = 50000;
- long time = System.currentTimeMillis();
- final javax.jms.Queue queue = createQueue(address);
-
- final ArrayList<Throwable> exceptions = new ArrayList<>();
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- Connection connectionConsumer = null;
- try {
- // TODO the test may starve if using the same connection (dead lock maybe?)
- connectionConsumer = createConnection();
- // connectionConsumer = connection;
- connectionConsumer.start();
- Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- long n = 0;
- int count = numMessages;
- while (count > 0) {
- try {
- if (++n % 1000 == 0) {
- System.out.println("received " + n + " messages");
- }
-
- Message m = consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + count + " on consumer", m);
- count--;
- } catch (JMSException e) {
- e.printStackTrace();
- break;
- }
- }
- } catch (Throwable e) {
- exceptions.add(e);
- e.printStackTrace();
- } finally {
- try {
- // if the createconnecion wasn't commented out
- if (connectionConsumer != connection) {
- connectionConsumer.close();
- }
- } catch (Throwable ignored) {
- // NO OP
- }
- }
- }
- });
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- t.start();
-
- MessageProducer p = session.createProducer(queue);
- p.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- for (int i = 0; i < numMessages; i++) {
- BytesMessage message = session.createBytesMessage();
- message.writeUTF("Hello world!!!!" + i);
- message.setIntProperty("count", i);
- p.send(message);
- }
- t.join();
-
- for (Throwable e : exceptions) {
- throw e;
- }
- Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
-
- connection.close();
- Assert.assertEquals(0, getMessageCount(q));
-
- long taken = (System.currentTimeMillis() - time);
- System.out.println("Microbenchamrk ran in " + taken + " milliseconds, sending/receiving " + numMessages);
-
- double messagesPerSecond = ((double) numMessages / (double) taken) * 1000;
-
- System.out.println(((int) messagesPerSecond) + " messages per second");
-
- }
-
- @Test
- public void testSimpleBinary() throws Throwable {
- final int numMessages = 500;
- long time = System.currentTimeMillis();
- final javax.jms.Queue queue = createQueue(address);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- byte[] bytes = new byte[0xf + 1];
- for (int i = 0; i <= 0xf; i++) {
- bytes[i] = (byte) i;
- }
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- System.out.println("Sending " + i);
- BytesMessage message = session.createBytesMessage();
-
- message.writeBytes(bytes);
- message.setIntProperty("count", i);
- p.send(message);
- }
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- BytesMessage m = (BytesMessage) consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
-
- m.reset();
-
- long size = m.getBodyLength();
- byte[] bytesReceived = new byte[(int) size];
- m.readBytes(bytesReceived);
-
- System.out.println("Received " + ByteUtil.bytesToHex(bytesReceived, 1) + " count - " + m.getIntProperty("count"));
-
- Assert.assertArrayEquals(bytes, bytesReceived);
- }
-
- // assertEquals(0, q.getMessageCount());
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testSimpleDefault() throws Throwable {
- final int numMessages = 500;
- long time = System.currentTimeMillis();
- final javax.jms.Queue queue = createQueue(address);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- byte[] bytes = new byte[0xf + 1];
- for (int i = 0; i <= 0xf; i++) {
- bytes[i] = (byte) i;
- }
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- System.out.println("Sending " + i);
- Message message = session.createMessage();
-
- message.setIntProperty("count", i);
- p.send(message);
- }
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- Message m = consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
- }
-
- // assertEquals(0, q.getMessageCount());
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testSimpleMap() throws Throwable {
- final int numMessages = 100;
- long time = System.currentTimeMillis();
- final javax.jms.Queue queue = createQueue(address);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- System.out.println("Sending " + i);
- MapMessage message = session.createMapMessage();
-
- message.setInt("i", i);
- message.setIntProperty("count", i);
- p.send(message);
- }
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- MapMessage m = (MapMessage) consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
-
- Assert.assertEquals(i, m.getInt("i"));
- Assert.assertEquals(i, m.getIntProperty("count"));
- }
-
- // assertEquals(0, q.getMessageCount());
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testSimpleStream() throws Throwable {
- final int numMessages = 100;
- final javax.jms.Queue queue = createQueue(address);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- StreamMessage message = session.createStreamMessage();
- message.writeInt(i);
- message.writeBoolean(true);
- message.writeString("test");
- p.send(message);
- }
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- StreamMessage m = (StreamMessage) consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
-
- Assert.assertEquals(i, m.readInt());
- Assert.assertEquals(true, m.readBoolean());
- Assert.assertEquals("test", m.readString());
- }
-
- }
-
- @Test
- public void testSimpleText() throws Throwable {
- final int numMessages = 100;
- long time = System.currentTimeMillis();
- final javax.jms.Queue queue = createQueue(address);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- System.out.println("Sending " + i);
- TextMessage message = session.createTextMessage("text" + i);
- message.setStringProperty("text", "text" + i);
- p.send(message);
- }
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- TextMessage m = (TextMessage) consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", m);
- Assert.assertEquals("text" + i, m.getText());
- }
-
- // assertEquals(0, q.getMessageCount());
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testTimedOutWaitingForWriteLogOnConsumer() throws Throwable {
- String name = "exampleQueue1";
-
- int numMessages = 50;
-
- System.out.println("1. Send messages into queue");
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = session.createQueue(name);
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- TextMessage message = session.createTextMessage();
- message.setText("Message temporary");
- p.send(message);
- }
- p.close();
- session.close();
-
- System.out.println("2. Receive one by one, each in its own session");
- for (int i = 0; i < numMessages; i++) {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(name);
- MessageConsumer c = session.createConsumer(queue);
- Message m = c.receive(1000);
- p.close();
- session.close();
- }
-
- System.out.println("3. Try to receive 10 in the same session");
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(name);
- MessageConsumer c = session.createConsumer(queue);
- for (int i = 0; i < numMessages; i++) {
- Message m = c.receive(1000);
- }
- p.close();
- session.close();
- }
-
- @Test
- public void testSimpleObject() throws Throwable {
- final int numMessages = 1;
- long time = System.currentTimeMillis();
- final javax.jms.Queue queue = createQueue(address);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < numMessages; i++) {
- System.out.println("Sending " + i);
- ObjectMessage message = session.createObjectMessage(new AnythingSerializable(i));
- p.send(message);
- }
-
- Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final MessageConsumer consumer = sessionConsumer.createConsumer(queue);
-
- for (int i = 0; i < numMessages; i++) {
- ObjectMessage msg = (ObjectMessage) consumer.receive(5000);
- Assert.assertNotNull("Could not receive message count=" + i + " on consumer", msg);
-
- AnythingSerializable someSerialThing = (AnythingSerializable) msg.getObject();
- Assert.assertEquals(i, someSerialThing.getCount());
- }
-
- // assertEquals(0, q.getMessageCount());
- long taken = (System.currentTimeMillis() - time) / 1000;
- System.out.println("taken = " + taken);
- }
-
- @Test
- public void testSelector() throws Exception {
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- TextMessage message = session.createTextMessage();
- message.setText("msg:0");
- p.send(message);
- message = session.createTextMessage();
- message.setText("msg:1");
- message.setStringProperty("color", "RED");
- p.send(message);
- connection.start();
- MessageConsumer messageConsumer = session.createConsumer(queue, "color = 'RED'");
- TextMessage m = (TextMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- Assert.assertEquals("msg:1", m.getText());
- Assert.assertEquals(m.getStringProperty("color"), "RED");
- connection.close();
- }
-
- @Test
- public void testProperties() throws Exception {
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- TextMessage message = session.createTextMessage();
- message.setText("msg:0");
- message.setBooleanProperty("true", true);
- message.setBooleanProperty("false", false);
- message.setStringProperty("foo", "bar");
- message.setDoubleProperty("double", 66.6);
- message.setFloatProperty("float", 56.789f);
- message.setIntProperty("int", 8);
- message.setByteProperty("byte", (byte) 10);
- p.send(message);
- p.send(message);
- connection.start();
- MessageConsumer messageConsumer = session.createConsumer(queue);
- TextMessage m = (TextMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- Assert.assertEquals("msg:0", m.getText());
- Assert.assertEquals(m.getBooleanProperty("true"), true);
- Assert.assertEquals(m.getBooleanProperty("false"), false);
- Assert.assertEquals(m.getStringProperty("foo"), "bar");
- Assert.assertEquals(m.getDoubleProperty("double"), 66.6, 0.0001);
- Assert.assertEquals(m.getFloatProperty("float"), 56.789f, 0.0001);
- Assert.assertEquals(m.getIntProperty("int"), 8);
- Assert.assertEquals(m.getByteProperty("byte"), (byte) 10);
- m = (TextMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- connection.close();
- }
-
- @Test
- public void testClientID() throws Exception {
- Connection testConn1 = createConnection(false);
- Connection testConn2 = createConnection(false);
- try {
- testConn1.setClientID("client-id1");
- try {
- testConn1.setClientID("client-id2");
- fail("didn't get expected exception");
- } catch (javax.jms.IllegalStateException e) {
- //expected
- }
-
- try {
- testConn2.setClientID("client-id1");
- fail("didn't get expected exception");
- } catch (InvalidClientIDException e) {
- //expected
- }
- } finally {
- testConn1.close();
- testConn2.close();
- }
-
- try {
- testConn1 = createConnection(false);
- testConn2 = createConnection(false);
- testConn1.setClientID("client-id1");
- testConn2.setClientID("client-id2");
- } finally {
- testConn1.close();
- testConn2.close();
- }
- }
-
- @Test
- public void testFilterJMSMessageID() throws Exception {
- javax.jms.Queue queue = createQueue(address);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- TextMessage message = session.createTextMessage();
- p.send(message);
- System.out.println("get mid: " + message.getJMSMessageID());
- connection.start();
- MessageConsumer messageConsumer = session.createConsumer(queue, "JMSMessageID = '" + message.getJMSMessageID() + "'");
- TextMessage m = (TextMessage) messageConsumer.receive(5000);
- Assert.assertNotNull(m);
- assertEquals(message.getJMSMessageID(), m.getJMSMessageID());
- connection.close();
- }
-
- @Test
- public void testProducerWithoutUsingDefaultDestination() throws Exception {
-
- try {
- javax.jms.Queue queue = createQueue(coreAddress);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(null);
-
- for (int i = 1; i <= 10; i++) {
- String targetName = coreAddress + i;
- javax.jms.Queue target = createQueue(targetName);
- TextMessage message = session.createTextMessage("message for " + targetName);
- p.send(target, message);
- }
- connection.start();
- MessageConsumer messageConsumer = session.createConsumer(queue);
- Message m = messageConsumer.receive(200);
- Assert.assertNull(m);
-
- for (int i = 1; i <= 10; i++) {
- String targetName = coreAddress + i;
- javax.jms.Queue target = createQueue(targetName);
- MessageConsumer consumer = session.createConsumer(target);
- TextMessage tm = (TextMessage) consumer.receive(2000);
- assertNotNull(tm);
- assertEquals("message for " + targetName, tm.getText());
- consumer.close();
- }
- } finally {
- connection.close();
- }
- }
-
- private javax.jms.Queue createQueue(String address) throws Exception {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- return session.createQueue(address);
- } finally {
- session.close();
- }
- }
-
- private Connection createConnection() throws JMSException {
- return this.createConnection(true);
- }
-
- private javax.jms.Connection createConnection(boolean isStart) throws JMSException {
- Connection connection;
- if (protocol == 3) {
- factory = new JmsConnectionFactory(amqpConnectionUri);
- connection = factory.createConnection();
- } else if (protocol == 0) {
- factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
- connection = factory.createConnection();
- } else {
- Assert.fail("protocol = " + protocol + " not supported");
- return null; // just to compile, the previous statement will throw an exception
- }
- if (isStart) {
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.start();
- }
-
- return connection;
- }
-
- private javax.jms.Connection createConnection(String clientId) throws JMSException {
- Connection connection;
- if (protocol == 3) {
- factory = new JmsConnectionFactory(amqpConnectionUri);
- connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.setClientID(clientId);
- connection.start();
- } else if (protocol == 0) {
- factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
- connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.setClientID(clientId);
- connection.start();
- } else {
- Assert.fail("protocol = " + protocol + " not supported");
- return null; // just to compile, the previous statement will throw an exception
- }
-
- return connection;
- }
-
- private void setAddressFullBlockPolicy() {
- // For BLOCK tests
- AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
- addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- addressSettings.setMaxSizeBytes(maxSizeBytes);
- addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold);
- server.getAddressSettingsRepository().addMatch("#", addressSettings);
- }
-
- public static class AnythingSerializable implements Serializable {
-
- private int count;
-
- public AnythingSerializable(int count) {
- this.count = count;
- }
-
- public int getCount() {
- return count;
- }
- }
-
- /**
- * If we have an address configured with both ANYCAST and MULTICAST routing types enabled, we must ensure that any
- * messages sent specifically to MULTICAST (e.g. JMS TopicProducer) are only delivered to MULTICAST queues (e.g.
- * i.e. subscription queues) and **NOT** to ANYCAST queues (e.g. JMS Queue).
- *
- * @throws Exception
- */
- @Test
- public void testRoutingExclusivity() throws Exception {
-
- // Create Address with both ANYCAST and MULTICAST enabled
- String testAddress = "testRoutingExclusivity";
- SimpleString ssTestAddress = new SimpleString(testAddress);
-
- AddressInfo addressInfo = new AddressInfo(ssTestAddress);
- addressInfo.addRoutingType(RoutingType.MULTICAST);
- addressInfo.addRoutingType(RoutingType.ANYCAST);
-
- server.addAddressInfo(addressInfo);
- server.createQueue(ssTestAddress, RoutingType.ANYCAST, ssTestAddress, null, true, false);
-
- Connection connection = createConnection(UUIDGenerator.getInstance().generateStringUUID());
-
- try {
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- Topic topic = session.createTopic(testAddress);
- javax.jms.Queue queue = session.createQueue(testAddress);
-
- MessageProducer producer = session.createProducer(topic);
-
- MessageConsumer queueConsumer = session.createConsumer(queue);
- MessageConsumer topicConsumer = session.createConsumer(topic);
-
- producer.send(session.createTextMessage("testMessage"));
-
- assertNotNull(topicConsumer.receive(1000));
- assertNull(queueConsumer.receive(1000));
- } finally {
- connection.close();
- }
- }
-
- @Test
- public void testReleaseDisposition() throws Exception {
- AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection connection = client.connect();
- try {
- AmqpSession session = connection.createSession();
-
- AmqpSender sender = session.createSender(address);
- AmqpMessage message = new AmqpMessage();
- message.setText("Test-Message");
- sender.send(message);
-
- AmqpReceiver receiver = session.createReceiver(address);
- receiver.flow(10);
-
- AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(m1);
- m1.release();
-
- //receiver.flow(10);
- AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(m2);
- m2.accept();
- } finally {
- connection.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
deleted file mode 100644
index ab8d2d3..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestBase.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.junit.After;
-import org.junit.Before;
-
-public class ProtonTestBase extends ActiveMQTestBase {
-
- protected String brokerName = "localhost";
- protected ActiveMQServer server;
-
- protected String tcpAmqpConnectionUri = "tcp://localhost:5672";
- protected String userName = "guest";
- protected String password = "guest";
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
-
- server = this.createAMQPServer(5672);
- server.start();
- }
-
- protected ActiveMQServer createAMQPServer(int port) throws Exception {
- final ActiveMQServer amqpServer = this.createServer(true, true);
- HashMap<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port));
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
- HashMap<String, Object> amqpParams = new HashMap<>();
- configureAmqp(amqpParams);
-
- amqpServer.getConfiguration().getAcceptorConfigurations().clear();
-
- TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "amqp-acceptor", amqpParams);
-
- amqpServer.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
- amqpServer.getConfiguration().setName(brokerName);
- amqpServer.getConfiguration().setJournalDirectory(amqpServer.getConfiguration().getJournalDirectory() + port);
- amqpServer.getConfiguration().setBindingsDirectory(amqpServer.getConfiguration().getBindingsDirectory() + port);
- amqpServer.getConfiguration().setPagingDirectory(amqpServer.getConfiguration().getPagingDirectory() + port);
-
- // Default Page
- AddressSettings addressSettings = new AddressSettings();
- addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- amqpServer.getConfiguration().getAddressesSettings().put("#", addressSettings);
- configureServer(amqpServer.getConfiguration());
- return amqpServer;
- }
-
- protected void configureServer(Configuration serverConfig) {
- }
-
- protected void configureAmqp(Map<String, Object> params) {
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- try {
- server.stop();
- } finally {
- super.tearDown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bad6acb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java
deleted file mode 100644
index dffc12e..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTestForHeader.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.amqp;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
-import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.Wait;
-import org.fusesource.hawtbuf.Buffer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ProtonTestForHeader extends ActiveMQTestBase {
-
- private ActiveMQServer server;
-
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- server = this.createServer(true, true);
- HashMap<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PORT_PROP_NAME, "5672");
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, "AMQP");
- TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
- server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
- server.getConfiguration().setSecurityEnabled(true);
- server.start();
- ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager();
- securityManager.getConfiguration().addUser("auser", "pass");
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- try {
- server.stop();
- } finally {
- super.tearDown();
- }
- }
-
- @Test
- public void testSimpleBytes() throws Exception {
- final AmqpHeader header = new AmqpHeader();
-
- header.setProtocolId(0);
- header.setMajor(1);
- header.setMinor(0);
- header.setRevision(0);
-
- final ClientConnection connection = new ClientConnection();
- connection.open("localhost", 5672);
- connection.send(header);
-
- AmqpHeader response = connection.readAmqpHeader();
- assertNotNull(response);
- IntegrationTestLogger.LOGGER.info("Broker responded with: " + response);
-
- assertTrue("Broker should have closed client connection", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisfied() throws Exception {
- try {
- connection.send(header);
- return false;
- } catch (Exception e) {
- return true;
- }
- }
- }, TimeUnit.SECONDS.toMillis(15), TimeUnit.MILLISECONDS.toMillis(250)));
- }
-
- private class ClientConnection {
-
- protected static final long RECEIVE_TIMEOUT = 10000;
- protected Socket clientSocket;
-
- public void open(String host, int port) throws IOException {
- clientSocket = new Socket(host, port);
- clientSocket.setTcpNoDelay(true);
- }
-
- public void send(AmqpHeader header) throws Exception {
- IntegrationTestLogger.LOGGER.info("Client sending header: " + header);
- OutputStream outputStream = clientSocket.getOutputStream();
- header.getBuffer().writeTo(outputStream);
- outputStream.flush();
- }
-
- public AmqpHeader readAmqpHeader() throws Exception {
- clientSocket.setSoTimeout((int) RECEIVE_TIMEOUT);
- InputStream is = clientSocket.getInputStream();
-
- byte[] header = new byte[8];
- int read = is.read(header);
- if (read == header.length) {
- return new AmqpHeader(new Buffer(header));
- } else {
- return null;
- }
- }
- }
-
- private class AmqpHeader {
-
- final Buffer PREFIX = new Buffer(new byte[]{'A', 'M', 'Q', 'P'});
-
- private Buffer buffer;
-
- AmqpHeader() {
- this(new Buffer(new byte[]{'A', 'M', 'Q', 'P', 0, 1, 0, 0}));
- }
-
- AmqpHeader(Buffer buffer) {
- this(buffer, true);
- }
-
- AmqpHeader(Buffer buffer, boolean validate) {
- setBuffer(buffer, validate);
- }
-
- public int getProtocolId() {
- return buffer.get(4) & 0xFF;
- }
-
- public void setProtocolId(int value) {
- buffer.data[buffer.offset + 4] = (byte) value;
- }
-
- public int getMajor() {
- return buffer.get(5) & 0xFF;
- }
-
- public void setMajor(int value) {
- buffer.data[buffer.offset + 5] = (byte) value;
- }
-
- public int getMinor() {
- return buffer.get(6) & 0xFF;
- }
-
- public void setMinor(int value) {
- buffer.data[buffer.offset + 6] = (byte) value;
- }
-
- public int getRevision() {
- return buffer.get(7) & 0xFF;
- }
-
- public void setRevision(int value) {
- buffer.data[buffer.offset + 7] = (byte) value;
- }
-
- public Buffer getBuffer() {
- return buffer;
- }
-
- public void setBuffer(Buffer value) {
- setBuffer(value, true);
- }
-
- public void setBuffer(Buffer value, boolean validate) {
- if (validate && !value.startsWith(PREFIX) || value.length() != 8) {
- throw new IllegalArgumentException("Not an AMQP header buffer");
- }
- buffer = value.buffer();
- }
-
- public boolean hasValidPrefix() {
- return buffer.startsWith(PREFIX);
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < buffer.length(); ++i) {
- char value = (char) buffer.get(i);
- if (Character.isLetter(value)) {
- builder.append(value);
- } else {
- builder.append(",");
- builder.append((int) value);
- }
- }
- return builder.toString();
- }
- }
-}