You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/11/29 16:52:14 UTC

[2/3] activemq-artemis git commit: ARTEMIS-1529 Adding test on durable topics

ARTEMIS-1529 Adding test on durable topics


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dbb3aadd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dbb3aadd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dbb3aadd

Branch: refs/heads/master
Commit: dbb3aaddf65f07c5ef77ace3a7efeb3f83632d8a
Parents: 4584ac6
Author: Tomas Kratky <tk...@redhat.com>
Authored: Tue Nov 28 16:37:46 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 29 09:45:09 2017 -0500

----------------------------------------------------------------------
 .../integration/amqp/TopicDurableTests.java     | 270 +++++++++++++++++++
 1 file changed, 270 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dbb3aadd/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
new file mode 100644
index 0000000..0a1a9d5
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/TopicDurableTests.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.activemq.artemis.core.config.DivertConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+
+public class TopicDurableTests extends JMSClientTestSupport {
+
+   @Override
+   protected void createAddressAndQueues(ActiveMQServer server) throws Exception {
+      // do not create unnecessary queues
+   }
+
+
+   @Test
+   public void testMessageDurableSubscription() throws Exception {
+      for (int i = 0; i < 100; i++) {
+         testLoop();
+         tearDown();
+         setUp();
+      }
+   }
+
+   private void testLoop() throws Exception {
+      JmsConnectionFactory connectionFactory = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI() + "?jms.clientID=jmsTopicClient");
+      Connection connection = connectionFactory.createConnection();
+      connection.start();
+
+      System.err.println("testMessageDurableSubscription");
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Topic testTopic =  session.createTopic("jmsTopic");
+
+      String sub1ID = "sub1DurSub";
+      String sub2ID = "sub2DurSub";
+      MessageConsumer subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+      MessageConsumer subscriber2 = session.createDurableSubscriber(testTopic, sub2ID);
+      MessageProducer messageProducer = session.createProducer(testTopic);
+
+      int count = 100;
+      String batchPrefix = "First";
+      List<Message> listMsgs = generateMessages(session, batchPrefix, count);
+      sendMessages(messageProducer, listMsgs);
+      System.err.println("First batch messages sent");
+
+      List<Message> recvd1 = receiveMessages(subscriber1, count);
+      List<Message> recvd2 = receiveMessages(subscriber2, count);
+
+      assertThat(recvd1.size(), is(count));
+      assertMessageContent(recvd1, batchPrefix);
+      System.err.println(sub1ID + " :First batch messages received");
+
+      assertThat(recvd2.size(), is(count));
+      assertMessageContent(recvd2, batchPrefix);
+      System.err.println(sub2ID + " :First batch messages received");
+
+      subscriber1.close();
+      System.err.println(sub1ID + " : closed");
+
+      batchPrefix = "Second";
+      listMsgs = generateMessages(session, batchPrefix, count);
+      sendMessages(messageProducer, listMsgs);
+      System.err.println("Second batch messages sent");
+
+      recvd2 = receiveMessages(subscriber2, count);
+      assertThat(recvd2.size(), is(count));
+      assertMessageContent(recvd2, batchPrefix);
+      System.err.println(sub2ID + " :Second batch messages received");
+
+      subscriber1 = session.createDurableSubscriber(testTopic, sub1ID);
+      System.err.println(sub1ID + " :connected");
+
+      recvd1 = receiveMessages(subscriber1, count);
+      assertThat(recvd1.size(), is(count));
+      assertMessageContent(recvd1, batchPrefix);
+      System.err.println(sub1ID + " :Second batch messages received");
+
+      subscriber1.close();
+      subscriber2.close();
+
+      session.unsubscribe(sub1ID);
+      session.unsubscribe(sub2ID);
+   }
+
+
+   @Test
+   public void testSharedNonDurableSubscription() throws JMSException, NamingException, InterruptedException, ExecutionException, TimeoutException {
+      int iterations = 100;
+      for (int i = 0; i < iterations; i++) {
+         System.err.println("testSharedNonDurableSubscription; iteration: " + i);
+         //SETUP-START
+         JmsConnectionFactory connectionFactory1 = new JmsConnectionFactory(getBrokerQpidJMSConnectionURI());
+         Connection connection1 = connectionFactory1.createConnection();
+
+
+         Hashtable env2 = new Hashtable<Object, Object>();
+         env2.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
+         env2.put("connectionfactory.qpidConnectionFactory", "amqp://localhost:5672");
+         env2.put("topic." + "jmsTopic", "jmsTopic");
+         Context context2 = new InitialContext(env2);
+         ConnectionFactory connectionFactory2 = (ConnectionFactory) context2.lookup("qpidConnectionFactory");
+         Connection connection2 = connectionFactory2.createConnection();
+
+         connection1.start();
+         connection2.start();
+
+         Session session = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic testTopic = session.createTopic("jmsTopic");
+         //SETUP-END
+
+         //BODY-S
+         String subID = "sharedConsumerNonDurable123";
+         MessageConsumer subscriber1 = session.createSharedConsumer(testTopic, subID);
+         MessageConsumer subscriber2 = session2.createSharedConsumer(testTopic, subID);
+         MessageConsumer subscriber3 = session2.createSharedConsumer(testTopic, subID);
+         MessageProducer messageProducer = session.createProducer(testTopic);
+         messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+         int count = 10;
+         List<Message> listMsgs = generateMessages(session, count);
+         List<CompletableFuture<List<Message>>> results = receiveMessagesAsync(count, subscriber1, subscriber2, subscriber3);
+         sendMessages(messageProducer, listMsgs);
+         System.err.println("messages sent");
+
+         assertThat("Each message should be received only by one consumer",
+                    results.get(0).get(20, TimeUnit.SECONDS).size() +
+                       results.get(1).get(20, TimeUnit.SECONDS).size() +
+                       results.get(2).get(20, TimeUnit.SECONDS).size(),
+                    is(count));
+         System.err.println("messages received");
+         //BODY-E
+
+         //TEAR-DOWN-S
+         connection1.stop();
+         connection2.stop();
+         subscriber1.close();
+         subscriber2.close();
+         session.close();
+         session2.close();
+         connection1.close();
+         connection2.close();
+         //TEAR-DOWN-E
+      }
+   }
+
+
+   private void sendMessages(MessageProducer producer, List<Message> messages) {
+      messages.forEach(m -> {
+         try {
+            producer.send(m);
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+   }
+
+   protected List<Message> receiveMessages(MessageConsumer consumer, int count) {
+      return receiveMessages(consumer, count, 0);
+   }
+
+   protected List<Message> receiveMessages(MessageConsumer consumer, int count, long timeout) {
+      List<Message> recvd = new ArrayList<>();
+      IntStream.range(0, count).forEach(i -> {
+         try {
+            recvd.add(timeout > 0 ? consumer.receive(timeout) : consumer.receive());
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+      return recvd;
+   }
+
+   protected void assertMessageContent(List<Message> msgs, String content) {
+      msgs.forEach(m -> {
+         try {
+            assertTrue(((TextMessage) m).getText().contains(content));
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+   }
+
+   protected List<Message> generateMessages(Session session, int count) {
+      return generateMessages(session, "", count);
+   }
+
+   protected List<Message> generateMessages(Session session, String prefix, int count) {
+      List<Message> messages = new ArrayList<>();
+      StringBuilder sb = new StringBuilder();
+      IntStream.range(0, count).forEach(i -> {
+         try {
+            messages.add(session.createTextMessage(sb.append(prefix).append("testMessage").append(i).toString()));
+            sb.setLength(0);
+         } catch (JMSException e) {
+            e.printStackTrace();
+         }
+      });
+      return messages;
+   }
+
+   protected List<CompletableFuture<List<Message>>> receiveMessagesAsync(int count, MessageConsumer... consumer) throws JMSException {
+      AtomicInteger totalCount = new AtomicInteger(count);
+      List<CompletableFuture<List<Message>>> resultsList = new ArrayList<>();
+      List<List<Message>> receivedResList = new ArrayList<>();
+
+      for (int i = 0; i < consumer.length; i++) {
+         final int index = i;
+         resultsList.add(new CompletableFuture<>());
+         receivedResList.add(new ArrayList<>());
+         MessageListener myListener = message -> {
+            System.err.println("Mesages received" + message + " count: " + totalCount.get());
+            receivedResList.get(index).add(message);
+            if (totalCount.decrementAndGet() == 0) {
+               for (int j = 0; j < consumer.length; j++) {
+                  resultsList.get(j).complete(receivedResList.get(j));
+               }
+            }
+         };
+         consumer[i].setMessageListener(myListener);
+      }
+      return resultsList;
+   }
+}