You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/04 14:52:00 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1013 Queue deliver after
AMQP msg release
ARTEMIS-1013 Queue deliver after AMQP msg release
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a1955169
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a1955169
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a1955169
Branch: refs/heads/artemis-1009
Commit: a1955169931405637a44af3304065cb8ed50ead7
Parents: 8e9a83d
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Mar 2 14:50:56 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Mar 4 09:51:50 2017 -0500
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 4 +--
.../amqp/AmqpReceiverDispositionTest.java | 9 ++++--
.../tests/integration/amqp/ProtonTest.java | 33 ++++++++++++++++++--
3 files changed, 38 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1955169/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 5931afe..7e44e79 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -326,7 +326,8 @@ public class AMQPSessionCallback implements SessionCallback {
public void cancel(Object brokerConsumer, Message message, boolean updateCounts) throws Exception {
recoverContext();
try {
- ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);
+ ((ServerConsumer) brokerConsumer).individualCancel(message.getMessageID(), updateCounts);;
+ ((ServerConsumer) brokerConsumer).getQueue().forceDelivery();
} finally {
resetContext();
}
@@ -560,7 +561,6 @@ public class AMQPSessionCallback implements SessionCallback {
Transaction tx = protonSPI.getTransaction(txid);
tx.rollback();
protonSPI.removeTransaction(txid);
-
}
public SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1955169/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
index d92fa0f..f206654 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverDispositionTest.java
@@ -44,6 +44,10 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
receiver1.flow(1);
AmqpMessage message = receiver1.receive(5, TimeUnit.SECONDS);
+
+ AmqpReceiver receiver2 = session.createReceiver(getTestName());
+
+
assertNotNull("did not receive message first time", message);
assertEquals("MessageID:0", message.getMessageId());
@@ -51,12 +55,11 @@ public class AmqpReceiverDispositionTest extends AmqpClientTestSupport {
assertNotNull(protonMessage);
assertEquals("Unexpected initial value for AMQP delivery-count", 0, protonMessage.getDeliveryCount());
+ receiver2.flow(1);
message.release();
- // Read the message again and validate its state
- AmqpReceiver receiver2 = session.createReceiver(getTestName());
- receiver2.flow(1);
+ // Read the message again and validate its state
message = receiver2.receive(10, TimeUnit.SECONDS);
assertNotNull("did not receive message again", message);
assertEquals("MessageID:0", message.getMessageId());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a1955169/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
index 4640c33..16f2e70 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ProtonTest.java
@@ -110,7 +110,6 @@ public class ProtonTest extends ProtonTestBase {
private static final String amqpConnectionUri = "amqp://localhost:5672";
private static final String tcpAmqpConnectionUri = "tcp://localhost:5672";
-
private static final String brokerName = "my-broker";
private static final long maxSizeBytes = 1 * 1024 * 1024;
@@ -472,7 +471,7 @@ public class ProtonTest extends ProtonTestBase {
session.close();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
//because tx commit is executed async on broker, we use a timed wait.
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
}
@Test
@@ -548,7 +547,7 @@ public class ProtonTest extends ProtonTestBase {
session.rollback();
Queue q = (Queue) server.getPostOffice().getBinding(new SimpleString(coreAddress)).getBindable();
//because tx rollback is executed async on broker, we use a timed wait.
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, ()-> q.getMessageCount() == 10));
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> q.getMessageCount() == 10));
}
@@ -1855,4 +1854,32 @@ public class ProtonTest extends ProtonTestBase {
return count;
}
}
+
+ @Test
+ public void testReleaseDisposition() throws Exception {
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpConnection connection = client.connect();
+ try {
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender(address);
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message");
+ sender.send(message);
+
+ AmqpReceiver receiver = session.createReceiver(address);
+ receiver.flow(10);
+
+ AmqpMessage m1 = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(m1);
+ m1.release();
+
+ //receiver.flow(10);
+ AmqpMessage m2 = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(m2);
+ m2.accept();
+ } finally {
+ connection.close();
+ }
+ }
}