You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/04 14:52:00 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1013 Queue deliver after AMQP msg release

ARTEMIS-1013 Queue deliver after AMQP  msg release


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a1955169
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a1955169
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a1955169

Branch: refs/heads/artemis-1009
Commit: a1955169931405637a44af3304065cb8ed50ead7
Parents: 8e9a83d
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Mar 2 14:50:56 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Mar 4 09:51:50 2017 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        |  4 +--
 .../amqp/AmqpReceiverDispositionTest.java       |  9 ++++--
 .../tests/integration/amqp/ProtonTest.java      | 33 ++++++++++++++++++--
 3 files changed, 38 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1955169/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 5931afe..7e44e79 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -326,7 +326,8 @@ public class AMQPSessionCallback implements SessionCallback {
    public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
       recoverContext();
       try {
-         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
+         ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);;
+         ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
       } finally {
          resetContext();
       }
@@ -560,7 +561,6 @@ public class AMQPSessionCallback implements SessionCallback {
       Transaction tx = protonSPI.getTransaction(txid);
       tx.rollback();
       protonSPI.removeTransaction(txid);
-
    }
 
    public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1955169/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index d92fa0f..f206654 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
       receiver1.flow(1);
 
       AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+
+      AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+
       assertNotNull("did not receive message first time", message);
       assertEquals("MessageID:0", message.getMessageId());
 
@@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
       assertNotNull(protonMessage);
       assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
 
+      receiver2.flow(1);
       message.release();
 
-      // Read the message again and validate its state
 
-      AmqpReceiver receiver2 = session.createReceiver(getTestName());
-      receiver2.flow(1);
+      // Read the message again and validate its state
       message = receiver2.receive(10, TimeUnit.SECONDS);
       assertNotNull("did not receive message again", message);
       assertEquals("MessageID:0", message.getMessageId());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1955169/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 4640c33..16f2e70 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase {
    private static final String amqpConnectionUri = "amqp://localhost:5672";
 
    private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
-
    private static final String brokerName = "my-broker";
 
    private static final long maxSizeBytes = 1 * 1024 * 1024;
@@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase {
       session.close();
       Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
       //because tx commit is executed async on broker, we use a timed wait.
-      assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
+      assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
    }
 
    @Test
@@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase {
       session.rollback();
       Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
       //because tx rollback is executed async on broker, we use a timed wait.
-      assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
+      assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
 
    }
 
@@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase {
          return count;
       }
    }
+
+   @Test
+   public void testReleaseDisposition() throws Exception {
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection connection = client.connect();
+      try {
+         AmqpSession session = connection.createSession();
+
+         AmqpSender sender = session.createSender(address);
+         AmqpMessage message = new AmqpMessage();
+         message.setText("Test-Message");
+         sender.send(message);
+
+         AmqpReceiver receiver = session.createReceiver(address);
+         receiver.flow(10);
+
+         AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(m1);
+         m1.release();
+
+         //receiver.flow(10);
+         AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS);
+         assertNotNull(m2);
+         m2.accept();
+      } finally {
+         connection.close();
+      }
+   }
 }