You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/21 22:46:54 UTC

[3/4] qpid-jms git commit: QPIDJMS-121 Add support for controlling the message disposition sent on a call to Message.acknowledge when in a client ack session. The mode is set via a vendor property JMS_AMQP_ACK_TYPE with values for ACCEPTED, REJECTED, RE

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ca457d73/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AmqpAcknowledgementsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AmqpAcknowledgementsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AmqpAcknowledgementsIntegrationTest.java
new file mode 100644
index 0000000..e6d557d
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/AmqpAcknowledgementsIntegrationTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.message.JmsMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.RejectedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ReleasedMatcher;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+public class AmqpAcknowledgementsIntegrationTest extends QpidJmsTestCase {
+
+    private static final int SKIP = -1;
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    @Test(timeout = 20000)
+    public void testDefaultAcceptMessages() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(SKIP, new AcceptedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestAcceptMessages() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.ACCEPTED, new AcceptedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestRejectMessages() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.REJECTED, new RejectedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestReleaseMessages() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestReleaseMessagesClearPropsFirst() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), true);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestModifiedFailedMessages() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.MODIFIED_FAILED, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestModifiedFailedUndeliverableHereMessages() throws Exception {
+        doTestAmqpAcknowledgementTestImpl(JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE, new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(true)), false);
+    }
+
+    private void doTestAmqpAcknowledgementTestImpl(int disposition, Matcher<?> descriptorMatcher, boolean clearPropsFirst) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            int msgCount = 3;
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(null), msgCount);
+            for (int i = 1; i <= msgCount; i++) {
+                testPeer.expectDisposition(true, descriptorMatcher);
+            }
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+
+            Message lastReceivedMessage = null;
+            for (int i = 1; i <= msgCount; i++) {
+                lastReceivedMessage = messageConsumer.receive(6000);
+                assertNotNull("Message " + i + " was not recieved", lastReceivedMessage);
+            }
+
+            if (disposition != SKIP) {
+                if (clearPropsFirst) {
+                    lastReceivedMessage.clearProperties();
+                }
+                lastReceivedMessage.setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, disposition);
+            }
+
+            lastReceivedMessage.acknowledge();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testDefaultAcceptMessagesWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(SKIP, new AcceptedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestAcceptMessagesWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.ACCEPTED, new AcceptedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestRejectMessagesWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.REJECTED, new RejectedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestReleaseMessagesWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestReleaseMessagesClearPropsFirstWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.RELEASED, new ReleasedMatcher(), true);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestModifiedFailedMessagesWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.MODIFIED_FAILED, new ModifiedMatcher().withDeliveryFailed(equalTo(true)), false);
+    }
+
+    @Test(timeout = 20000)
+    public void testRequestModifiedFailedUndeliverableHereMessagesWithMessageListener() throws Exception {
+        doTestAmqpAcknowledgementAsyncTestImpl(JmsMessageSupport.MODIFIED_FAILED_UNDELIVERABLE, new ModifiedMatcher().withDeliveryFailed(equalTo(true)).withUndeliverableHere(equalTo(true)), false);
+    }
+
+    private void doTestAmqpAcknowledgementAsyncTestImpl(int disposition, Matcher<?> descriptorMatcher, boolean clearPropsFirst) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            final int msgCount = 3;
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(null), msgCount);
+            for (int i = 1; i <= msgCount; i++) {
+                testPeer.expectDisposition(true, descriptorMatcher);
+            }
+
+            final CountDownLatch receiveCountDown = new CountDownLatch(msgCount);
+            final AtomicReference<Message> lastReceivedMessage = new AtomicReference<Message>();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            messageConsumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message message) {
+                    lastReceivedMessage.set(message);
+                    receiveCountDown.countDown();
+                }
+            });
+
+            assertTrue("Did not get all messages", receiveCountDown.await(10, TimeUnit.SECONDS));
+            assertNotNull("Message was not received", lastReceivedMessage.get());
+
+            if (disposition != SKIP) {
+                if (clearPropsFirst) {
+                    lastReceivedMessage.get().clearProperties();
+                }
+                lastReceivedMessage.get().setIntProperty(JmsMessageSupport.JMS_AMQP_ACK_TYPE, disposition);
+            }
+
+            lastReceivedMessage.get().acknowledge();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org