You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2018/01/29 07:47:39 UTC

[2/5] activemq-artemis git commit: ARTEMIS-1638 Fixing Purge rollback behaviour (test only)

ARTEMIS-1638 Fixing Purge rollback behaviour (test only)

Having a commit with just a test will make it easy for developers to checkout just this branch
and validate the issue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/69429e4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/69429e4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/69429e4e

Branch: refs/heads/master
Commit: 69429e4e230960c2bd29e031fc270a4a72dac148
Parents: c671fa0
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jan 25 21:17:25 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jan 26 23:24:55 2018 -0500

----------------------------------------------------------------------
 .../integration/amqp/AmqpClientTestSupport.java |   5 +
 .../amqp/AmqpPurgeOnNoConsumersTest.java        | 127 ++++++++++++++-----
 2 files changed, 103 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 044183f..a01c975 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -304,6 +304,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
    }
 
    protected void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception {
+      sendMessages(destinationName, count, routingType, false);
+   }
+
+   protected void sendMessages(String destinationName, int count, RoutingType routingType, boolean durable) throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       try {
@@ -313,6 +317,7 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
          for (int i = 0; i < count; ++i) {
             AmqpMessage message = new AmqpMessage();
             message.setMessageId("MessageID:" + i);
+            message.setDurable(true);
             if (routingType != null) {
                message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType());
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/69429e4e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
index 9ded61c..ae4849b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPurgeOnNoConsumersTest.java
@@ -16,10 +16,20 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -28,57 +38,116 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
 public class AmqpPurgeOnNoConsumersTest extends AmqpClientTestSupport {
 
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
    @Test(timeout = 60000)
    public void testQueueReceiverReadMessage() throws Exception {
+      AmqpConnection connection = null;
       String queue = "purgeQueue";
       SimpleString ssQueue = new SimpleString(queue);
+
       server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
       server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
 
       AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
+      connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
 
       final AmqpReceiver receiver = session.createReceiver(queue);
 
-      Queue queueView = getProxyToQueue(queue);
+      QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
       assertEquals(0, queueView.getMessageCount());
 
-      Thread t = new Thread(new Runnable() {
-         @Override
-         public void run() {
-            for (int i = 0; i < 4; i++) {
-               try {
-                  AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-                  receive.accept();
-                  assertNotNull(receive);
-               } catch (Exception e) {
-                  e.printStackTrace();
-               }
-            }
-            try {
-               receiver.close();
-            } catch (IOException e) {
-               e.printStackTrace();
-            }
-         }
-      });
+      sendMessages(queue, 5, null, true);
 
-      t.start();
+      Wait.assertEquals(5, queueView::getMessageCount);
 
       receiver.flow(5);
 
-      sendMessages(queue, 5);
-
-      t.join(5000);
+      for (int i = 0; i < 4; i++) {
+         try {
+            AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+            receive.accept();
+            assertNotNull(receive);
+         } catch (Exception e) {
+            e.printStackTrace();
+         }
+      }
+      try {
+         receiver.close();
+      } catch (IOException e) {
+         e.printStackTrace();
+      }
 
       Wait.assertEquals(0, queueView::getMessageCount);
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
 
       connection.close();
+
+      server.stop();
+
+      server.start();
+
+      queueView = (QueueImpl)getProxyToQueue(queue);
+
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
+   }
+
+
+   // I'm adding the core test here to compare semantics between AMQP and core on this test.
+   @Test(timeout = 60000)
+   public void testPurgeQueueCoreRollback() throws Exception {
+      String queue = "purgeQueue";
+      SimpleString ssQueue = new SimpleString(queue);
+      server.addAddressInfo(new AddressInfo(ssQueue, RoutingType.ANYCAST));
+      server.createQueue(ssQueue, RoutingType.ANYCAST, ssQueue, null, true, false, 1, true, false);
+
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:5672");
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageProducer producer = session.createProducer(session.createQueue("purgeQueue"));
+
+      javax.jms.Queue jmsQueue = session.createQueue(queue);
+      MessageConsumer consumer = session.createConsumer(jmsQueue);
+
+      for (int i = 0; i < 10; i++) {
+         Message message = session.createTextMessage("hello " + i);
+         producer.send(message);
+      }
+      session.commit();
+
+      QueueImpl queueView = (QueueImpl)getProxyToQueue(queue);
+
+      Wait.assertEquals(10, queueView::getMessageCount);
+
+      connection.start();
+
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage txt = (TextMessage)consumer.receive(1000);
+         assertNotNull(txt);
+         assertEquals("hello " + i, txt.getText());
+      }
+      consumer.close();
+      session.rollback();
+      connection.close();
+
+      Wait.assertEquals(0, queueView::getMessageCount);
+
+      server.stop();
+
+      server.start();
+
+      queueView = (QueueImpl)getProxyToQueue(queue);
+
+      assertEquals(0, queueView.getMessageCount());
+      assertEquals(0L, queueView.getPageSubscription().getPagingStore().getAddressSize());
    }
 }