You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/11/29 16:52:14 UTC
[2/3] activemq-artemis git commit: ARTEMIS-1529 Adding test on
durable topics
ARTEMIS-1529 Adding test on durable topics
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dbb3aadd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dbb3aadd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dbb3aadd
Branch: refs/heads/master
Commit: dbb3aaddf65f07c5ef77ace3a7efeb3f83632d8a
Parents: 4584ac6
Author: Tomas Kratky <tk...@redhat.com>
Authored: Tue Nov 28 16:37:46 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 09:45:09 2017 -0500
----------------------------------------------------------------------
.../integration/amqp/TopicDurableTests.java | 270 +++++++++++++++++++
1 file changed, 270 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbb3aadd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
new file mode 100644
index 0000000..0a1a9d5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class TopicDurableTests extends JMSClientTestSupport {
+
+ @Override
+ protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+ // do not create unnecessary queues
+ }
+
+
+ @Test
+ public void testMessageDurableSubscription() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ testLoop();
+ tearDown();
+ setUp();
+ }
+ }
+
+ private void testLoop() throws Exception {
+ JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ System.err.println("testMessageDurableSubscription");
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic testTopic = session.createTopic("jmsTopic");
+
+ String sub1ID = "sub1DurSub";
+ String sub2ID = "sub2DurSub";
+ MessageConsumer subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+ MessageConsumer subscriber2 = session.createDurableSubscriber(testTopic, sub2ID);
+ MessageProducer messageProducer = session.createProducer(testTopic);
+
+ int count = 100;
+ String batchPrefix = "First";
+ List<Message> listMsgs = generateMessages(session, batchPrefix, count);
+ sendMessages(messageProducer, listMsgs);
+ System.err.println("First batch messages sent");
+
+ List<Message> recvd1 = receiveMessages(subscriber1, count);
+ List<Message> recvd2 = receiveMessages(subscriber2, count);
+
+ assertThat(recvd1.size(), is(count));
+ assertMessageContent(recvd1, batchPrefix);
+ System.err.println(sub1ID + " :First batch messages received");
+
+ assertThat(recvd2.size(), is(count));
+ assertMessageContent(recvd2, batchPrefix);
+ System.err.println(sub2ID + " :First batch messages received");
+
+ subscriber1.close();
+ System.err.println(sub1ID + " : closed");
+
+ batchPrefix = "Second";
+ listMsgs = generateMessages(session, batchPrefix, count);
+ sendMessages(messageProducer, listMsgs);
+ System.err.println("Second batch messages sent");
+
+ recvd2 = receiveMessages(subscriber2, count);
+ assertThat(recvd2.size(), is(count));
+ assertMessageContent(recvd2, batchPrefix);
+ System.err.println(sub2ID + " :Second batch messages received");
+
+ subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+ System.err.println(sub1ID + " :connected");
+
+ recvd1 = receiveMessages(subscriber1, count);
+ assertThat(recvd1.size(), is(count));
+ assertMessageContent(recvd1, batchPrefix);
+ System.err.println(sub1ID + " :Second batch messages received");
+
+ subscriber1.close();
+ subscriber2.close();
+
+ session.unsubscribe(sub1ID);
+ session.unsubscribe(sub2ID);
+ }
+
+
+ @Test
+ public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
+ int iterations = 100;
+ for (int i = 0; i < iterations; i++) {
+ System.err.println("testSharedNonDurableSubscription; iteration: " + i);
+ //SETUP-START
+ JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
+ Connection connection1 = connectionFactory1.createConnection();
+
+
+ Hashtable env2 = new Hashtable<Object, Object>();
+ env2.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+ env2.put("connectionfactory.qpidConnectionFactory", "amqp://localhost:5672");
+ env2.put("topic." + "jmsTopic", "jmsTopic");
+ Context context2 = new InitialContext(env2);
+ ConnectionFactory connectionFactory2 = (ConnectionFactory) context2.lookup("qpidConnectionFactory");
+ Connection connection2 = connectionFactory2.createConnection();
+
+ connection1.start();
+ connection2.start();
+
+ Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic testTopic = session.createTopic("jmsTopic");
+ //SETUP-END
+
+ //BODY-S
+ String subID = "sharedConsumerNonDurable123";
+ MessageConsumer subscriber1 = session.createSharedConsumer(testTopic, subID);
+ MessageConsumer subscriber2 = session2.createSharedConsumer(testTopic, subID);
+ MessageConsumer subscriber3 = session2.createSharedConsumer(testTopic, subID);
+ MessageProducer messageProducer = session.createProducer(testTopic);
+ messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ int count = 10;
+ List<Message> listMsgs = generateMessages(session, count);
+ List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
+ sendMessages(messageProducer, listMsgs);
+ System.err.println("messages sent");
+
+ assertThat("Each message should be received only by one consumer",
+ results.get(0).get(20, TimeUnit.SECONDS).size() +
+ results.get(1).get(20, TimeUnit.SECONDS).size() +
+ results.get(2).get(20, TimeUnit.SECONDS).size(),
+ is(count));
+ System.err.println("messages received");
+ //BODY-E
+
+ //TEAR-DOWN-S
+ connection1.stop();
+ connection2.stop();
+ subscriber1.close();
+ subscriber2.close();
+ session.close();
+ session2.close();
+ connection1.close();
+ connection2.close();
+ //TEAR-DOWN-E
+ }
+ }
+
+
+ private void sendMessages(MessageProducer producer, List<Message> messages) {
+ messages.forEach(m -> {
+ try {
+ producer.send(m);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ protected List<Message> receiveMessages(MessageConsumer consumer, int count) {
+ return receiveMessages(consumer, count, 0);
+ }
+
+ protected List<Message> receiveMessages(MessageConsumer consumer, int count, long timeout) {
+ List<Message> recvd = new ArrayList<>();
+ IntStream.range(0, count).forEach(i -> {
+ try {
+ recvd.add(timeout > 0 ? consumer.receive(timeout) : consumer.receive());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ return recvd;
+ }
+
+ protected void assertMessageContent(List<Message> msgs, String content) {
+ msgs.forEach(m -> {
+ try {
+ assertTrue(((TextMessage) m).getText().contains(content));
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ }
+
+ protected List<Message> generateMessages(Session session, int count) {
+ return generateMessages(session, "", count);
+ }
+
+ protected List<Message> generateMessages(Session session, String prefix, int count) {
+ List<Message> messages = new ArrayList<>();
+ StringBuilder sb = new StringBuilder();
+ IntStream.range(0, count).forEach(i -> {
+ try {
+ messages.add(session.createTextMessage(sb.append(prefix).append("testMessage").append(i).toString()));
+ sb.setLength(0);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ return messages;
+ }
+
+ protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
+ AtomicInteger totalCount = new AtomicInteger(count);
+ List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
+ List<List<Message>> receivedResList = new ArrayList<>();
+
+ for (int i = 0; i < consumer.length; i++) {
+ final int index = i;
+ resultsList.add(new CompletableFuture<>());
+ receivedResList.add(new ArrayList<>());
+ MessageListener myListener = message -> {
+ System.err.println("Mesages received" + message + " count: " + totalCount.get());
+ receivedResList.get(index).add(message);
+ if (totalCount.decrementAndGet() == 0) {
+ for (int j = 0; j < consumer.length; j++) {
+ resultsList.get(j).complete(receivedResList.get(j));
+ }
+ }
+ };
+ consumer[i].setMessageListener(myListener);
+ }
+ return resultsList;
+ }
+}