You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/02 18:14:42 UTC

[2/2] activemq-artemis git commit: ARTEMIS-268 Adds tests that shows issue with presettled receivers

ARTEMIS-268 Adds tests that shows issue with presettled receivers

Tests added that show that a receiver attached to a queue as presettled
are not removing the messages that are dispatched to them.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b07d6a9e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b07d6a9e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b07d6a9e

Branch: refs/heads/master
Commit: b07d6a9e75ddc305a31e39ee41b82b361a9564f3
Parents: 3066690
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 11 14:07:02 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Nov 2 14:14:35 2016 -0400

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  |   2 +
 .../amqp/AmqpPresettledReceiverTest.java        | 270 +++++++++++++++++++
 2 files changed, 272 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b07d6a9e/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 76279c5..adb7acc 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -482,6 +482,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             sender.send(nettyBuffer.array(), nettyBuffer.arrayOffset() + nettyBuffer.readerIndex(), nettyBuffer.readableBytes());
 
             if (preSettle) {
+               // Presettled means the client implicitly accepts any delivery we send it.
+               sessionSPI.ack(null, brokerConsumer, message);
                delivery.settle();
             } else {
                sender.advance();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b07d6a9e/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
new file mode 100644
index 0000000..657aff7
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpPresettledReceiverTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpPresettledReceiverTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testPresettledReceiverAndNonPresettledReceiverOnSameQueue() throws Exception {
+      final int MSG_COUNT = 2;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver1 = session.createReceiver(getTestName(), null, false, true);
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      receiver1.flow(1);
+      receiver2.flow(1);
+
+      AmqpMessage message1 = receiver1.receive(5, TimeUnit.SECONDS);
+      AmqpMessage message2 = receiver2.receive(5, TimeUnit.SECONDS);
+
+      assertNotNull(message1);
+      assertNotNull(message2);
+
+      // Receiver 1 is presettled so messages are not accepted.
+      assertTrue(message1.getWrappedDelivery().remotelySettled());
+
+      // Receiver 2 is not presettled so it needs to accept.
+      message2.accept();
+
+      receiver1.close();
+      receiver2.close();
+
+      System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
+
+      // Should be nothing left on the Queue
+      AmqpReceiver receiver3 = session.createReceiver(getTestName());
+      receiver3.flow(1);
+
+      AmqpMessage received = receiver3.receive(5, TimeUnit.SECONDS);
+      if (received != null) {
+         System.out.println("Message read: " + received.getMessageId());
+      }
+      assertNull(received);
+
+      assertEquals(0, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPresettledReceiverReadsAllMessages() throws Exception {
+      final int MSG_COUNT = 100;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
+
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      receiver.flow(MSG_COUNT);
+      for (int i = 0; i < MSG_COUNT; ++i) {
+         assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      }
+      receiver.close();
+
+      System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
+
+      // Open a new receiver and see if any message are left on the Queue
+      receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      if (received != null) {
+         System.out.println("Message read: " + received.getMessageId());
+      }
+      assertNull(received);
+
+      assertEquals(0, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPresettledReceiverReadsAllMessagesInWhenReadInBatches() throws Exception {
+      final int MSG_COUNT = 100;
+      sendMessages(getTestName(), MSG_COUNT);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
+
+      final Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(MSG_COUNT, queueView.getMessageCount());
+
+      // Consume all 100 but do so in batches by flowing only limited credit.
+
+      receiver.flow(20);
+      // consume less that flow
+      for (int j = 0; j < 10; j++) {
+         assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      }
+
+      // flow more and consume all
+      receiver.flow(10);
+      for (int j = 0; j < 20; j++) {
+         assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      }
+
+      // remainder
+      receiver.flow(70);
+      for (int j = 0; j < 70; j++) {
+         assertNotNull(receiver.receive(5, TimeUnit.SECONDS));
+      }
+
+      receiver.close();
+
+      System.out.println("Message Count after all consumed: " + queueView.getMessageCount());
+
+      // Open a new receiver and see if any message are left on the Queue
+      receiver = session.createReceiver(getTestName());
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      if (received != null) {
+         System.out.println("Message read: " + received.getMessageId());
+      }
+      assertNull(received);
+
+      assertEquals(0, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPresettledReceiverWithinBoundsOfActiveTXWithCommit() throws Exception {
+      doTestPresettledReceiverWithinBoundsOfActiveTX(true);
+   }
+
+   @Test(timeout = 60000)
+   public void testPresettledReceiverWithinBoundsOfActiveTXWithRollback() throws Exception {
+      doTestPresettledReceiverWithinBoundsOfActiveTX(false);
+   }
+
+   private void doTestPresettledReceiverWithinBoundsOfActiveTX(boolean commit) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      final Queue queue = getProxyToQueue(getTestName());
+
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message");
+      sender.send(message);
+
+      assertEquals(1, queue.getMessageCount());
+
+      AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
+
+      session.begin();
+
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      assertTrue(received.getWrappedDelivery().remotelySettled());
+
+      if (commit) {
+         session.commit();
+      } else {
+         session.rollback();
+      }
+
+      assertEquals(0, queue.getMessageCount());
+
+      sender.close();
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testPresettledReceiverWithinBoundsOfActiveTXWithSendAndRollback() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpSender sender = session.createSender(getTestName());
+      final Queue queue = getProxyToQueue(getTestName());
+
+      AmqpMessage message = new AmqpMessage();
+      message.setText("Test-Message");
+      sender.send(message);
+
+      assertEquals(1, queue.getMessageCount());
+
+      AmqpReceiver receiver = session.createReceiver(getTestName(), null, false, true);
+
+      session.begin();
+
+      receiver.flow(1);
+      AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(received);
+      assertTrue(received.getWrappedDelivery().remotelySettled());
+
+      message = new AmqpMessage();
+      message.setText("Test-Message - Rolled Back");
+      sender.send(message);
+
+      session.rollback();
+
+      assertEquals(0, queue.getMessageCount());
+
+      sender.close();
+      connection.close();
+   }
+
+   public void sendMessages(String destinationName, int count) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+}