You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/28 18:48:29 UTC

[1/2] qpid-jms git commit: QPIDJMS-98: convert local expiration tests to use the test peer

Repository: qpid-jms
Updated Branches:
  refs/heads/master 4ad5a685c -> 10b5ed824


QPIDJMS-98: convert local expiration tests to use the test peer


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b672bf07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b672bf07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b672bf07

Branch: refs/heads/master
Commit: b672bf0799ca8a646e4bf84d5d992d267e072622
Parents: 4ad5a68
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Aug 28 17:06:52 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Aug 28 17:46:45 2015 +0100

----------------------------------------------------------------------
 .../MessageExpirationIntegrationTest.java       | 265 +++++++++++++++++++
 .../jms/integration/SessionIntegrationTest.java |   2 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  32 ++-
 .../JmsExpiredMessageConsumptionTest.java       | 247 -----------------
 4 files changed, 292 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
new file mode 100644
index 0000000..b0f222b
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageExpirationIntegrationTest extends QpidJmsTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(MessageExpirationIntegrationTest.class);
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    @Test(timeout=20000)
+    public void testIncomingExpiredMessageGetsFiltered() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach and send credit, then send it an
+            // already-expired message followed by a live message.
+            testPeer.expectReceiverAttach();
+
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType("already-expired"));
+
+            String liveMsgContent = "valid";
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Call receive, expect the first message to be filtered due to expiry,
+            // and the second message to be given to the test app and accepted.
+            ModifiedMatcher modified = new ModifiedMatcher();
+            modified.withDeliveryFailed(equalTo(true));
+            modified.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modified, 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            Message m = consumer.receive(3000);
+            assertNotNull("Message should have been received", m);
+            assertTrue(m instanceof TextMessage);
+            assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)m).getText());
+
+            // Verify the other message is not there
+            m = consumer.receive(10);
+            assertNull("Message should not have been received", m);
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testIncomingExpiredMessageGetsConsumedWhenFilterDisabled() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.localMessageExpiry=false");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach and send credit, then send it an
+            // already-expired message followed by a live message.
+            testPeer.expectReceiverAttach();
+
+            String expiredMsgContent = "already-expired";
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType(expiredMsgContent));
+
+            String liveMsgContent = "valid";
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Call receive, expect the expired message as we disabled local expiry.
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+
+            Message m = consumer.receive(3000);
+            assertNotNull("Message should have been received", m);
+            assertTrue(m instanceof TextMessage);
+            assertEquals("Unexpected message content", expiredMsgContent, ((TextMessage)m).getText());
+
+            // Verify the other message is there
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            m = consumer.receive(3000);
+            assertNotNull("Message should have been received", m);
+            assertTrue(m instanceof TextMessage);
+            assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)m).getText());
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testIncomingExpiredMessageGetsFilteredAsync() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach and send credit, then send it an
+            // already-expired message followed by a live message.
+            testPeer.expectReceiverAttach();
+
+            String expiredMessageContent = "already-expired";
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType(expiredMessageContent));
+
+            final String liveMsgContent = "valid";
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Add message listener, expect the first message to be filtered due to expiry,
+            // and the second message to be given to the test app and accepted.
+            ModifiedMatcher modified = new ModifiedMatcher();
+            modified.withDeliveryFailed(equalTo(true));
+            modified.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modified, 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            final CountDownLatch success = new CountDownLatch(1);
+            final AtomicBoolean listenerFailure = new AtomicBoolean();
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message incoming) {
+                    try {
+                        TextMessage textMessage = (TextMessage) incoming;
+                        if (liveMsgContent.equals(textMessage.getText())) {
+                            success.countDown();
+                        } else {
+                            listenerFailure.set(true);
+                            LOG.error("Received unexpected message:" + incoming);
+                        }
+                    } catch (Exception e) {
+                        listenerFailure.set(true);
+                        LOG.error("Exception in listener", e);
+                    }
+                }
+            });
+
+            assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS));
+            assertFalse("There was a failure in the listener, see logs", listenerFailure.get());
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testIncomingExpiredMessageGetsConsumedWhenFilterDisabledAsync() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.localMessageExpiry=false");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach and send credit, then send it an
+            // already-expired message followed by a live message.
+            testPeer.expectReceiverAttach();
+
+            final String expiredMessageContent = "already-expired";
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType(expiredMessageContent));
+
+            final String liveMsgContent = "valid";
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Add message listener, expect both messages as the filter is disabled
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            final CountDownLatch success = new CountDownLatch(2);
+            final AtomicBoolean listenerFailure = new AtomicBoolean();
+            consumer.setMessageListener(new MessageListener() {
+
+                @Override
+                public void onMessage(Message incoming) {
+                    try {
+                        TextMessage textMessage = (TextMessage) incoming;
+                        if (expiredMessageContent.equals(textMessage.getText()) || liveMsgContent.equals(textMessage.getText())) {
+                            success.countDown();
+                        } else {
+                            listenerFailure.set(true);
+                            LOG.error("Received unexpected message:" + incoming);
+                        }
+                    } catch (Exception e) {
+                        listenerFailure.set(true);
+                        LOG.error("Exception in listener", e);
+                    }
+                }
+            });
+
+            assertTrue("didn't get expected messages", success.await(5, TimeUnit.SECONDS));
+            assertFalse("There was a failure in the listener, see logs", listenerFailure.get());
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index cc61286..0dd4ece 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1430,7 +1430,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
 
             //Expect the session close
             testPeer.expectEnd(false);
-            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession();
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType("content"), 1);
             testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true);
             testPeer.remotelyEndLastOpenedSession(false, 200);
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 01b7e3b..aab25a3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1358,6 +1358,11 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectDisposition(boolean settled, Matcher<?> stateMatcher)
     {
+        expectDisposition(settled, stateMatcher, null, null);
+    }
+
+    public void expectDisposition(boolean settled, Matcher<?> stateMatcher, Integer firstDeliveryId, Integer lastDeliveryId)
+    {
         Matcher<Boolean> settledMatcher = null;
         if(settled)
         {
@@ -1368,8 +1373,20 @@ public class TestAmqpPeer implements AutoCloseable
             settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
         }
 
+        Matcher<?> firstDeliveryIdMatcher = notNullValue();
+        if(firstDeliveryId != null) {
+            firstDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(firstDeliveryId));
+        }
+
+        Matcher<?> lastDeliveryIdMatcher = notNullValue();
+        if(lastDeliveryId != null) {
+            lastDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(lastDeliveryId));
+        }
+
         addHandler(new DispositionMatcher()
             .withSettled(settledMatcher)
+            .withFirst(firstDeliveryIdMatcher)
+            .withLast(lastDeliveryIdMatcher)
             .withState(stateMatcher));
     }
 
@@ -1539,22 +1556,25 @@ public class TestAmqpPeer implements AutoCloseable
         return comp;
     }
 
-    public void sendTransferToLastOpenedLinkOnLastOpenedSession() {
+    public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
+                                                                final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
+                                                                final PropertiesDescribedType propertiesDescribedType,
+                                                                final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+                                                                final DescribedType content,
+                                                                final int nextIncomingDeliveryId) {
         synchronized (_handlersLock) {
             CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
 
-            final int nextId = 0; //TODO: shouldn't be hard coded
-
-            String tagString = "theDeliveryTag" + nextId;
+            String tagString = "theDeliveryTag" + nextIncomingDeliveryId;
             Binary dtag = new Binary(tagString.getBytes());
 
             final TransferFrame transferResponse = new TransferFrame()
-            .setDeliveryId(UnsignedInteger.valueOf(nextId))
+            .setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId))
             .setDeliveryTag(dtag)
             .setMessageFormat(UnsignedInteger.ZERO)
             .setSettled(false);
 
-            Binary payload = prepareTransferPayload(null, null, null, null, new AmqpValueDescribedType("myTextMessage"));
+            Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content);
 
             // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
             final FrameSender transferSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
deleted file mode 100644
index 0920f38..0000000
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.consumer;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.qpid.jms.JmsConnection;
-import org.apache.qpid.jms.support.AmqpTestSupport;
-import org.apache.qpid.jms.support.Wait;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
-
-    @Override
-    protected void addAdditionalBrokerPlugins(List<BrokerPlugin> plugins) {
-        @SuppressWarnings("unchecked")
-        BrokerPlugin expireOutbound = new BrokerPluginSupport() {
-
-            @Override
-            public void preProcessDispatch(MessageDispatch messageDispatch) {
-                if (messageDispatch.getMessage() != null) {
-                    LOG.trace("Preprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
-                    if (messageDispatch.getMessage().getExpiration() != 0) {
-                        messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
-                    }
-                }
-
-                super.preProcessDispatch(messageDispatch);
-            }
-        };
-
-        plugins.add(expireOutbound);
-    }
-
-    @Override
-    protected void configureBrokerPolicies(BrokerService broker) {
-        PolicyMap policyMap = new PolicyMap();
-        PolicyEntry defaultEntry = new PolicyEntry();
-        defaultEntry.setExpireMessagesPeriod(60000);
-        defaultEntry.setUseCache(false);
-        policyMap.setDefaultEntry(defaultEntry);
-
-        broker.setDestinationPolicy(policyMap);
-    }
-
-    @Test(timeout = 60000)
-    public void testConsumerExpiredMessageSync() throws Exception {
-        connection = createAmqpConnection();
-        connection.start();
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
-
-        MessageConsumer consumer = session.createConsumer(queue);
-        MessageProducer producer = session.createProducer(queue);
-
-        producer.setTimeToLive(500);
-        producer.send(session.createTextMessage("Message-1"));
-
-        producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-        producer.send(session.createTextMessage("Message-2"));
-
-        Message received = consumer.receive(5000);
-        assertNotNull(received);
-        TextMessage textMessage = (TextMessage) received;
-        assertTrue(textMessage.getText().endsWith("2"));
-
-        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
-        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return proxy.getQueueSize() == 0;
-            }
-        }));
-
-        final QueueViewMBean dlqProxy = getProxyToQueue("ActiveMQ.DLQ");
-        assertTrue("Queued message did not get sent to DLQ.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dlqProxy.getQueueSize() == 1;
-            }
-        }));
-    }
-
-    @Test(timeout = 60000)
-    public void testConsumerExpiredMessageAsync() throws Exception {
-        connection = createAmqpConnection();
-        connection.start();
-
-        final CountDownLatch success = new CountDownLatch(1);
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
-
-        MessageConsumer consumer = session.createConsumer(queue);
-        MessageProducer producer = session.createProducer(queue);
-
-        producer.setTimeToLive(500);
-        producer.send(session.createTextMessage("Message-1"));
-
-        producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
-        producer.send(session.createTextMessage("Message-2"));
-
-        consumer.setMessageListener(new MessageListener() {
-
-            @Override
-            public void onMessage(Message incoming) {
-                TextMessage textMessage = (TextMessage) incoming;
-                try {
-                    if (textMessage.getText().endsWith("2")) {
-                        success.countDown();
-                    }
-                } catch (JMSException e) {
-                }
-            }
-        });
-
-        assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS));
-
-        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
-        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return proxy.getQueueSize() == 0;
-            }
-        }));
-    }
-
-    @Test(timeout = 60000)
-    public void testConsumerExpirationFilterDisabledSync() throws Exception {
-        connection = createAmqpConnection();
-        connection.start();
-
-        JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.setLocalMessageExpiry(false);
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
-
-        MessageConsumer consumer = session.createConsumer(queue);
-        MessageProducer producer = session.createProducer(queue);
-
-        producer.setTimeToLive(500);
-        producer.send(session.createTextMessage("Message-1"));
-
-        Message received = consumer.receive(5000);
-        assertNotNull(received);
-        TextMessage textMessage = (TextMessage) received;
-        assertTrue(textMessage.getText().endsWith("1"));
-
-        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
-        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return proxy.getQueueSize() == 0;
-            }
-        }));
-    }
-
-    @Test(timeout = 60000)
-    public void testConsumerExpirationFilterDisabledAsync() throws Exception {
-        connection = createAmqpConnection();
-        connection.start();
-
-        JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.setLocalMessageExpiry(false);
-
-        final CountDownLatch success = new CountDownLatch(1);
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
-
-        MessageConsumer consumer = session.createConsumer(queue);
-        MessageProducer producer = session.createProducer(queue);
-
-        producer.setTimeToLive(500);
-        producer.send(session.createTextMessage("Message-1"));
-
-        consumer.setMessageListener(new MessageListener() {
-
-            @Override
-            public void onMessage(Message incoming) {
-                TextMessage textMessage = (TextMessage) incoming;
-                try {
-                    if (textMessage.getText().endsWith("1")) {
-                        success.countDown();
-                    }
-                } catch (JMSException e) {
-                }
-            }
-        });
-
-        assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS));
-
-        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
-        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return proxy.getQueueSize() == 0;
-            }
-        }));
-    }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-98: convert the ZeroPrefetch+Expiry test to use the test peer rather than broker

Posted by ro...@apache.org.
QPIDJMS-98: convert the ZeroPrefetch+Expiry test to use the test peer rather than broker


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/10b5ed82
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/10b5ed82
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/10b5ed82

Branch: refs/heads/master
Commit: 10b5ed824daea964addfc4113ea48fe45125d56d
Parents: b672bf0
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Aug 28 17:36:36 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Aug 28 17:47:41 2015 +0100

----------------------------------------------------------------------
 .../ZeroPrefetchIntegrationTest.java            | 92 ++++++++++++++++++++
 .../qpid/jms/consumer/JmsZeroPrefetchTest.java  | 33 -------
 2 files changed, 92 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10b5ed82/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
new file mode 100644
index 0000000..37ffeba
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    @Test(timeout=20000)
+    public void testZeroPrefetchConsumerReceiveWithMessageExpiredInFlight() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Expect that once receive is called, it flows a credit, give it an already-expired message.
+            // Expect it to be filtered due to local expiration checking.
+            PropertiesDescribedType props = new PropertiesDescribedType();
+            props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType("already-expired"));
+
+            ModifiedMatcher modifiedMatcher = new ModifiedMatcher();
+            modifiedMatcher.withDeliveryFailed(equalTo(true));
+            modifiedMatcher.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectDisposition(true, modifiedMatcher, 1, 1);
+
+            // Expect the client to then flow another credit requesting a message.
+            testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
+
+            // Send it a live message, expect it to get accepted.
+            String liveMsgContent = "valid";
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+            Message m = consumer.receive(5000);
+            assertNotNull("Message should have been received", m);
+            assertTrue(m instanceof TextMessage);
+            assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText());
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10b5ed82/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index f8b911b..ead4f5d 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -312,37 +312,4 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
         assertNull(message);
     }
 
-    @Test(timeout=20000)
-    public void testConsumerReceivePrefetchZeroMessageExpiredInFlight() throws Exception {
-        connection = createAmqpConnection();
-        connection.start();
-
-        JmsConnection jmsConnection = (JmsConnection) connection;
-        jmsConnection.getPrefetchPolicy().setAll(0);
-
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue(name.getMethodName());
-        MessageProducer producer = session.createProducer(queue);
-        TextMessage expiredMessage = session.createTextMessage("expired message");
-        TextMessage validMessage = session.createTextMessage("valid message");
-        producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 50);
-        producer.send(validMessage);
-        session.close();
-
-        session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer consumer = session.createConsumer(queue);
-        Message message = consumer.receive(3000);
-        assertNotNull(message);
-        TextMessage received = (TextMessage) message;
-        assertEquals("expired message", received.getText());
-
-        // Rollback allow the first message to expire.
-        session.rollback();
-
-        // Consume again, this should fetch the second valid message via a pull.
-        message = consumer.receive(3000);
-        assertNotNull(message);
-        received = (TextMessage) message;
-        assertEquals("valid message", received.getText());
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org