You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/28 18:48:29 UTC
[1/2] qpid-jms git commit: QPIDJMS-98: convert local expiration tests
to use the test peer
Repository: qpid-jms
Updated Branches:
refs/heads/master 4ad5a685c -> 10b5ed824
QPIDJMS-98: convert local expiration tests to use the test peer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b672bf07
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b672bf07
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b672bf07
Branch: refs/heads/master
Commit: b672bf0799ca8a646e4bf84d5d992d267e072622
Parents: 4ad5a68
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Aug 28 17:06:52 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Aug 28 17:46:45 2015 +0100
----------------------------------------------------------------------
.../MessageExpirationIntegrationTest.java | 265 +++++++++++++++++++
.../jms/integration/SessionIntegrationTest.java | 2 +-
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 32 ++-
.../JmsExpiredMessageConsumptionTest.java | 247 -----------------
4 files changed, 292 insertions(+), 254 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
new file mode 100644
index 0000000..b0f222b
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageExpirationIntegrationTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageExpirationIntegrationTest extends QpidJmsTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageExpirationIntegrationTest.class);
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout=20000)
+ public void testIncomingExpiredMessageGetsFiltered() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach and send credit, then send it an
+ // already-expired message followed by a live message.
+ testPeer.expectReceiverAttach();
+
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType("already-expired"));
+
+ String liveMsgContent = "valid";
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Call receive, expect the first message to be filtered due to expiry,
+ // and the second message to be given to the test app and accepted.
+ ModifiedMatcher modified = new ModifiedMatcher();
+ modified.withDeliveryFailed(equalTo(true));
+ modified.withUndeliverableHere(equalTo(true));
+
+ testPeer.expectDisposition(true, modified, 1, 1);
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+ Message m = consumer.receive(3000);
+ assertNotNull("Message should have been received", m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)m).getText());
+
+ // Verify the other message is not there
+ m = consumer.receive(10);
+ assertNull("Message should not have been received", m);
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testIncomingExpiredMessageGetsConsumedWhenFilterDisabled() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.localMessageExpiry=false");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach and send credit, then send it an
+ // already-expired message followed by a live message.
+ testPeer.expectReceiverAttach();
+
+ String expiredMsgContent = "already-expired";
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType(expiredMsgContent));
+
+ String liveMsgContent = "valid";
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Call receive, expect the expired message as we disabled local expiry.
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+
+ Message m = consumer.receive(3000);
+ assertNotNull("Message should have been received", m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Unexpected message content", expiredMsgContent, ((TextMessage)m).getText());
+
+ // Verify the other message is there
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+ m = consumer.receive(3000);
+ assertNotNull("Message should have been received", m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Unexpected message content", liveMsgContent, ((TextMessage)m).getText());
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testIncomingExpiredMessageGetsFilteredAsync() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach and send credit, then send it an
+ // already-expired message followed by a live message.
+ testPeer.expectReceiverAttach();
+
+ String expiredMessageContent = "already-expired";
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType(expiredMessageContent));
+
+ final String liveMsgContent = "valid";
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Add message listener, expect the first message to be filtered due to expiry,
+ // and the second message to be given to the test app and accepted.
+ ModifiedMatcher modified = new ModifiedMatcher();
+ modified.withDeliveryFailed(equalTo(true));
+ modified.withUndeliverableHere(equalTo(true));
+
+ testPeer.expectDisposition(true, modified, 1, 1);
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+ final CountDownLatch success = new CountDownLatch(1);
+ final AtomicBoolean listenerFailure = new AtomicBoolean();
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message incoming) {
+ try {
+ TextMessage textMessage = (TextMessage) incoming;
+ if (liveMsgContent.equals(textMessage.getText())) {
+ success.countDown();
+ } else {
+ listenerFailure.set(true);
+ LOG.error("Received unexpected message:" + incoming);
+ }
+ } catch (Exception e) {
+ listenerFailure.set(true);
+ LOG.error("Exception in listener", e);
+ }
+ }
+ });
+
+ assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS));
+ assertFalse("There was a failure in the listener, see logs", listenerFailure.get());
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testIncomingExpiredMessageGetsConsumedWhenFilterDisabledAsync() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.localMessageExpiry=false");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach and send credit, then send it an
+ // already-expired message followed by a live message.
+ testPeer.expectReceiverAttach();
+
+ final String expiredMessageContent = "already-expired";
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType(expiredMessageContent));
+
+ final String liveMsgContent = "valid";
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Add message listener, expect both messages as the filter is disabled
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+ final CountDownLatch success = new CountDownLatch(2);
+ final AtomicBoolean listenerFailure = new AtomicBoolean();
+ consumer.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message incoming) {
+ try {
+ TextMessage textMessage = (TextMessage) incoming;
+ if (expiredMessageContent.equals(textMessage.getText()) || liveMsgContent.equals(textMessage.getText())) {
+ success.countDown();
+ } else {
+ listenerFailure.set(true);
+ LOG.error("Received unexpected message:" + incoming);
+ }
+ } catch (Exception e) {
+ listenerFailure.set(true);
+ LOG.error("Exception in listener", e);
+ }
+ }
+ });
+
+ assertTrue("didn't get expected messages", success.await(5, TimeUnit.SECONDS));
+ assertFalse("There was a failure in the listener, see logs", listenerFailure.get());
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index cc61286..0dd4ece 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1430,7 +1430,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
//Expect the session close
testPeer.expectEnd(false);
- testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession();
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType("content"), 1);
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true);
testPeer.remotelyEndLastOpenedSession(false, 200);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 01b7e3b..aab25a3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1358,6 +1358,11 @@ public class TestAmqpPeer implements AutoCloseable
public void expectDisposition(boolean settled, Matcher<?> stateMatcher)
{
+ expectDisposition(settled, stateMatcher, null, null);
+ }
+
+ public void expectDisposition(boolean settled, Matcher<?> stateMatcher, Integer firstDeliveryId, Integer lastDeliveryId)
+ {
Matcher<Boolean> settledMatcher = null;
if(settled)
{
@@ -1368,8 +1373,20 @@ public class TestAmqpPeer implements AutoCloseable
settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
+ Matcher<?> firstDeliveryIdMatcher = notNullValue();
+ if(firstDeliveryId != null) {
+ firstDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(firstDeliveryId));
+ }
+
+ Matcher<?> lastDeliveryIdMatcher = notNullValue();
+ if(lastDeliveryId != null) {
+ lastDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(lastDeliveryId));
+ }
+
addHandler(new DispositionMatcher()
.withSettled(settledMatcher)
+ .withFirst(firstDeliveryIdMatcher)
+ .withLast(lastDeliveryIdMatcher)
.withState(stateMatcher));
}
@@ -1539,22 +1556,25 @@ public class TestAmqpPeer implements AutoCloseable
return comp;
}
- public void sendTransferToLastOpenedLinkOnLastOpenedSession() {
+ public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
+ final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
+ final PropertiesDescribedType propertiesDescribedType,
+ final ApplicationPropertiesDescribedType appPropertiesDescribedType,
+ final DescribedType content,
+ final int nextIncomingDeliveryId) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
- final int nextId = 0; //TODO: shouldn't be hard coded
-
- String tagString = "theDeliveryTag" + nextId;
+ String tagString = "theDeliveryTag" + nextIncomingDeliveryId;
Binary dtag = new Binary(tagString.getBytes());
final TransferFrame transferResponse = new TransferFrame()
- .setDeliveryId(UnsignedInteger.valueOf(nextId))
+ .setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId))
.setDeliveryTag(dtag)
.setMessageFormat(UnsignedInteger.ZERO)
.setSettled(false);
- Binary payload = prepareTransferPayload(null, null, null, null, new AmqpValueDescribedType("myTextMessage"));
+ Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender transferSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b672bf07/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
deleted file mode 100644
index 0920f38..0000000
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsExpiredMessageConsumptionTest.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.consumer;
-
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.QueueViewMBean;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.qpid.jms.JmsConnection;
-import org.apache.qpid.jms.support.AmqpTestSupport;
-import org.apache.qpid.jms.support.Wait;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JmsExpiredMessageConsumptionTest extends AmqpTestSupport {
-
- protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class);
-
- @Override
- protected void addAdditionalBrokerPlugins(List<BrokerPlugin> plugins) {
- @SuppressWarnings("unchecked")
- BrokerPlugin expireOutbound = new BrokerPluginSupport() {
-
- @Override
- public void preProcessDispatch(MessageDispatch messageDispatch) {
- if (messageDispatch.getMessage() != null) {
- LOG.trace("Preprocessing dispatch: {}", messageDispatch.getMessage().getMessageId());
- if (messageDispatch.getMessage().getExpiration() != 0) {
- messageDispatch.getMessage().setExpiration(System.currentTimeMillis() - 1000);
- }
- }
-
- super.preProcessDispatch(messageDispatch);
- }
- };
-
- plugins.add(expireOutbound);
- }
-
- @Override
- protected void configureBrokerPolicies(BrokerService broker) {
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry defaultEntry = new PolicyEntry();
- defaultEntry.setExpireMessagesPeriod(60000);
- defaultEntry.setUseCache(false);
- policyMap.setDefaultEntry(defaultEntry);
-
- broker.setDestinationPolicy(policyMap);
- }
-
- @Test(timeout = 60000)
- public void testConsumerExpiredMessageSync() throws Exception {
- connection = createAmqpConnection();
- connection.start();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
-
- MessageConsumer consumer = session.createConsumer(queue);
- MessageProducer producer = session.createProducer(queue);
-
- producer.setTimeToLive(500);
- producer.send(session.createTextMessage("Message-1"));
-
- producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
- producer.send(session.createTextMessage("Message-2"));
-
- Message received = consumer.receive(5000);
- assertNotNull(received);
- TextMessage textMessage = (TextMessage) received;
- assertTrue(textMessage.getText().endsWith("2"));
-
- final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
- assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return proxy.getQueueSize() == 0;
- }
- }));
-
- final QueueViewMBean dlqProxy = getProxyToQueue("ActiveMQ.DLQ");
- assertTrue("Queued message did not get sent to DLQ.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return dlqProxy.getQueueSize() == 1;
- }
- }));
- }
-
- @Test(timeout = 60000)
- public void testConsumerExpiredMessageAsync() throws Exception {
- connection = createAmqpConnection();
- connection.start();
-
- final CountDownLatch success = new CountDownLatch(1);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
-
- MessageConsumer consumer = session.createConsumer(queue);
- MessageProducer producer = session.createProducer(queue);
-
- producer.setTimeToLive(500);
- producer.send(session.createTextMessage("Message-1"));
-
- producer.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
- producer.send(session.createTextMessage("Message-2"));
-
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message incoming) {
- TextMessage textMessage = (TextMessage) incoming;
- try {
- if (textMessage.getText().endsWith("2")) {
- success.countDown();
- }
- } catch (JMSException e) {
- }
- }
- });
-
- assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS));
-
- final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
- assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return proxy.getQueueSize() == 0;
- }
- }));
- }
-
- @Test(timeout = 60000)
- public void testConsumerExpirationFilterDisabledSync() throws Exception {
- connection = createAmqpConnection();
- connection.start();
-
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.setLocalMessageExpiry(false);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
-
- MessageConsumer consumer = session.createConsumer(queue);
- MessageProducer producer = session.createProducer(queue);
-
- producer.setTimeToLive(500);
- producer.send(session.createTextMessage("Message-1"));
-
- Message received = consumer.receive(5000);
- assertNotNull(received);
- TextMessage textMessage = (TextMessage) received;
- assertTrue(textMessage.getText().endsWith("1"));
-
- final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
- assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return proxy.getQueueSize() == 0;
- }
- }));
- }
-
- @Test(timeout = 60000)
- public void testConsumerExpirationFilterDisabledAsync() throws Exception {
- connection = createAmqpConnection();
- connection.start();
-
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.setLocalMessageExpiry(false);
-
- final CountDownLatch success = new CountDownLatch(1);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
-
- MessageConsumer consumer = session.createConsumer(queue);
- MessageProducer producer = session.createProducer(queue);
-
- producer.setTimeToLive(500);
- producer.send(session.createTextMessage("Message-1"));
-
- consumer.setMessageListener(new MessageListener() {
-
- @Override
- public void onMessage(Message incoming) {
- TextMessage textMessage = (TextMessage) incoming;
- try {
- if (textMessage.getText().endsWith("1")) {
- success.countDown();
- }
- } catch (JMSException e) {
- }
- }
- });
-
- assertTrue("didn't get expected message", success.await(5, TimeUnit.SECONDS));
-
- final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
- assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return proxy.getQueueSize() == 0;
- }
- }));
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-98: convert the
ZeroPrefetch+Expiry test to use the test peer rather than broker
Posted by ro...@apache.org.
QPIDJMS-98: convert the ZeroPrefetch+Expiry test to use the test peer rather than broker
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/10b5ed82
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/10b5ed82
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/10b5ed82
Branch: refs/heads/master
Commit: 10b5ed824daea964addfc4113ea48fe45125d56d
Parents: b672bf0
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Aug 28 17:36:36 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Aug 28 17:47:41 2015 +0100
----------------------------------------------------------------------
.../ZeroPrefetchIntegrationTest.java | 92 ++++++++++++++++++++
.../qpid/jms/consumer/JmsZeroPrefetchTest.java | 33 -------
2 files changed, 92 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10b5ed82/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
new file mode 100644
index 0000000..37ffeba
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout=20000)
+ public void testZeroPrefetchConsumerReceiveWithMessageExpiredInFlight() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // Create a connection with zero prefetch
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach but NOT send credit
+ testPeer.expectReceiverAttach();
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Expect that once receive is called, it flows a credit, give it an already-expired message.
+ // Expect it to be filtered due to local expiration checking.
+ PropertiesDescribedType props = new PropertiesDescribedType();
+ props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType("already-expired"));
+
+ ModifiedMatcher modifiedMatcher = new ModifiedMatcher();
+ modifiedMatcher.withDeliveryFailed(equalTo(true));
+ modifiedMatcher.withUndeliverableHere(equalTo(true));
+
+ testPeer.expectDisposition(true, modifiedMatcher, 1, 1);
+
+ // Expect the client to then flow another credit requesting a message.
+ testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
+
+ // Send it a live message, expect it to get accepted.
+ String liveMsgContent = "valid";
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
+
+ Message m = consumer.receive(5000);
+ assertNotNull("Message should have been received", m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText());
+
+ testPeer.waitForAllHandlersToComplete(3000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10b5ed82/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
index f8b911b..ead4f5d 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java
@@ -312,37 +312,4 @@ public class JmsZeroPrefetchTest extends AmqpTestSupport {
assertNull(message);
}
- @Test(timeout=20000)
- public void testConsumerReceivePrefetchZeroMessageExpiredInFlight() throws Exception {
- connection = createAmqpConnection();
- connection.start();
-
- JmsConnection jmsConnection = (JmsConnection) connection;
- jmsConnection.getPrefetchPolicy().setAll(0);
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(name.getMethodName());
- MessageProducer producer = session.createProducer(queue);
- TextMessage expiredMessage = session.createTextMessage("expired message");
- TextMessage validMessage = session.createTextMessage("valid message");
- producer.send(expiredMessage, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, 50);
- producer.send(validMessage);
- session.close();
-
- session = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(queue);
- Message message = consumer.receive(3000);
- assertNotNull(message);
- TextMessage received = (TextMessage) message;
- assertEquals("expired message", received.getText());
-
- // Rollback allow the first message to expire.
- session.rollback();
-
- // Consume again, this should fetch the second valid message via a pull.
- message = consumer.receive(3000);
- assertNotNull(message);
- received = (TextMessage) message;
- assertEquals("valid message", received.getText());
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org