You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/08 11:29:16 UTC
qpid-broker-j git commit: QPID-6933: [System Tests] Move
TimeToLiveTest into JMS 1.1 system tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 910f439bd -> 377315ba5
QPID-6933: [System Tests] Move TimeToLiveTest into JMS 1.1 system tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/377315ba
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/377315ba
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/377315ba
Branch: refs/heads/master
Commit: 377315ba5d272d0c4125b6f8deb88b8e1de4c38e
Parents: 910f439
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 11:28:08 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Dec 8 11:28:08 2017 +0000
----------------------------------------------------------------------
.../org/apache/qpid/systests/JmsTestBase.java | 6 +
.../jms_1_1/message/TimeToLiveTest.java | 182 +++++++++
.../jms_1_1/topic/DurableSubscribtionTest.java | 4 +-
.../jms_1_1/topic/TopicSessionTest.java | 7 -
.../jms_1_1/topic/TopicSubscriberTest.java | 2 +-
.../qpid/server/queue/TimeToLiveTest.java | 399 -------------------
test-profiles/CPPExcludes | 4 -
7 files changed, 191 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
index 9be3f1c..806d35b 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/JmsTestBase.java
@@ -33,6 +33,7 @@ import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
+import javax.jms.TopicConnection;
import javax.naming.NamingException;
import org.junit.BeforeClass;
@@ -194,4 +195,9 @@ public abstract class JmsTestBase extends BrokerAdminUsingTestBase
return (Map<String, Object>) statistics;
}
+
+ protected TopicConnection getTopicConnection() throws JMSException, NamingException
+ {
+ return (TopicConnection) getConnection();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java
new file mode 100644
index 0000000..db6b01f
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/TimeToLiveTest.java
@@ -0,0 +1,182 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.jms_1_1.message;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSubscriber;
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+
+public class TimeToLiveTest extends JmsTestBase
+{
+ @Test
+ public void testPassiveTTL() throws Exception
+ {
+ Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ long timeToLiveMillis = getReceiveTimeout();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setTimeToLive(timeToLiveMillis);
+ producer.send(session.createTextMessage("A"));
+ producer.setTimeToLive(0);
+ producer.send(session.createTextMessage("B"));
+ session.commit();
+
+ Thread.sleep(timeToLiveMillis);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ Message message = consumer.receive(getReceiveTimeout());
+
+ assertTrue("TextMessage should be received", message instanceof TextMessage);
+ assertEquals("Unexpected message received", "B", ((TextMessage)message).getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testActiveTTL() throws Exception
+ {
+ Queue queue = createQueue(getTestName());
+ Connection connection = getConnection();
+ long timeToLiveMillis = getReceiveTimeout() * 2;
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = session.createProducer(queue);
+ producer.setTimeToLive(timeToLiveMillis);
+ producer.send(session.createTextMessage("A"));
+ producer.setTimeToLive(0);
+ producer.send(session.createTextMessage("B"));
+ session.commit();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ Message message = consumer.receive(getReceiveTimeout());
+
+ assertTrue("TextMessage should be received", message instanceof TextMessage);
+ assertEquals("Unexpected message received", "A", ((TextMessage) message).getText());
+
+ Thread.sleep(timeToLiveMillis);
+
+ session.rollback();
+ message = consumer.receive(getReceiveTimeout());
+
+ assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage);
+ assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testPassiveTTLWithDurableSubscription() throws Exception
+ {
+ long timeToLiveMillis = getReceiveTimeout() * 2;
+ String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ TopicConnection connection = getTopicConnection();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName);
+ MessageProducer producer = session.createProducer(topic);
+ producer.setTimeToLive(timeToLiveMillis);
+ producer.send(session.createTextMessage("A"));
+ producer.setTimeToLive(0);
+ producer.send(session.createTextMessage("B"));
+ session.commit();
+
+ connection.start();
+ Message message = durableSubscriber.receive(getReceiveTimeout());
+
+ assertTrue("TextMessage should be received", message instanceof TextMessage);
+ assertEquals("Unexpected message received", "A", ((TextMessage)message).getText());
+
+ Thread.sleep(timeToLiveMillis);
+
+ session.rollback();
+ message = durableSubscriber.receive(getReceiveTimeout());
+
+ assertTrue("TextMessage should be received after waiting for TTL", message instanceof TextMessage);
+ assertEquals("Unexpected message received after waiting for TTL", "B", ((TextMessage) message).getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testActiveTTLWithDurableSubscription() throws Exception
+ {
+ long timeToLiveMillis = getReceiveTimeout();
+ String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ TopicConnection connection = getTopicConnection();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber durableSubscriber = session.createDurableSubscriber(topic, subscriptionName);
+ MessageProducer producer = session.createProducer(topic);
+ producer.setTimeToLive(timeToLiveMillis);
+ producer.send(session.createTextMessage("A"));
+ producer.setTimeToLive(0);
+ producer.send(session.createTextMessage("B"));
+ session.commit();
+
+ Thread.sleep(timeToLiveMillis);
+
+ connection.start();
+ Message message = durableSubscriber.receive(getReceiveTimeout());
+
+ assertTrue("TextMessage should be received", message instanceof TextMessage);
+ assertEquals("Unexpected message received", "B", ((TextMessage)message).getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
index 249dc18..c759207 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
@@ -619,7 +619,7 @@ public class DurableSubscribtionTest extends JmsTestBase
getBrokerAdmin().restart();
- TopicConnection publisherConnection = (TopicConnection) getConnection();
+ TopicConnection publisherConnection = getTopicConnection();
try
{
TopicSession session = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -740,7 +740,7 @@ public class DurableSubscribtionTest extends JmsTestBase
}
//send messages matching and not matching the original used selector
- TopicConnection publisherConnection = (TopicConnection) getConnection();
+ TopicConnection publisherConnection = getTopicConnection();
try
{
TopicSession session = publisherConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java
index 3ea05d7..f31533c 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSessionTest.java
@@ -22,14 +22,12 @@ package org.apache.qpid.systests.jms_1_1.topic;
import static org.junit.Assert.fail;
-import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
-import javax.naming.NamingException;
import org.junit.Test;
@@ -124,9 +122,4 @@ public class TopicSessionTest extends JmsTestBase
}
}
- private TopicConnection getTopicConnection() throws JMSException, NamingException
- {
- return (TopicConnection)getConnection();
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java
index 18d40e8..bfa19f9 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/TopicSubscriberTest.java
@@ -47,7 +47,7 @@ public class TopicSubscriberTest extends JmsTestBase
public void messageDeliveredToAllSubscribers() throws Exception
{
Topic topic = createTopic(getTestName());
- final TopicConnection connection = (TopicConnection) getConnection();
+ final TopicConnection connection = getTopicConnection();
try
{
final TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java b/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java
deleted file mode 100644
index 918bdde..0000000
--- a/systests/src/test/java/org/apache/qpid/server/queue/TimeToLiveTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.queue;
-
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
-import javax.jms.TopicSubscriber;
-import javax.naming.NamingException;
-
-import org.junit.Assert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class TimeToLiveTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(TimeToLiveTest.class);
-
- protected final String QUEUE = "TimeToLiveQueue";
-
- private final long TIME_TO_LIVE = 100L;
-
- private static final int MSG_COUNT = 50;
- private static final long SERVER_TTL_TIMEOUT = 60000L;
- private long _shortReceiveTimeout;
- private long _longReceiveTimeout;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- _longReceiveTimeout = getLongReceiveTimeout();
- _shortReceiveTimeout = getShortReceiveTimeout();
- }
-
- public void testPassiveTTLWithPrefetch() throws Exception
- {
- doTestPassiveTTL(true);
- }
-
- public void testPassiveTTL() throws Exception
- {
- doTestPassiveTTL(false);
-
- }
-
- private void doTestPassiveTTL(boolean prefetchMessages) throws JMSException, NamingException
- {
- //Create Client 1
- Connection clientConnection = getConnection();
-
- Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = createTestQueue(clientSession, QUEUE);
-
-
- //Create Producer
- Connection producerConnection = getConnection();
-
- producerConnection.start();
-
- // Move to a Transacted session to ensure that all messages have been delivered to broker before
- // we start waiting for TTL
- Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageProducer producer = producerSession.createProducer(queue);
-
- MessageConsumer consumer = clientSession.createConsumer(queue);
- if(prefetchMessages)
- {
- clientConnection.start();
- }
-
- //Set TTL
- int msg = 0;
- producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
-
- producer.setTimeToLive(TIME_TO_LIVE);
-
- for (; msg < MSG_COUNT - 2; msg++)
- {
- producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
- }
-
- //Reset TTL
- producer.setTimeToLive(0L);
- producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
-
- producerSession.commit();
-
-
- // Ensure we sleep the required amount of time.
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- final long MILLIS = 1000000L;
-
- long waitTime = TIME_TO_LIVE * MILLIS;
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- if(prefetchMessages)
- {
- clientConnection.close();
- clientConnection = getConnection();
-
- clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = clientSession.createQueue(QUEUE);
- consumer = clientSession.createConsumer(queue);
- }
-
- clientConnection.start();
-
- Message receivedFirst = consumer.receive(_longReceiveTimeout);
- Message receivedSecond = consumer.receive(_longReceiveTimeout);
- Message receivedThird = consumer.receive(_shortReceiveTimeout);
-
- // Log the messages to help diagnosis incase of failure
- LOGGER.info("First:"+receivedFirst);
- LOGGER.info("Second:"+receivedSecond);
- LOGGER.info("Third:"+receivedThird);
-
- // Only first and last messages sent should survive expiry
- Assert.assertNull("More messages received", receivedThird);
-
- Assert.assertNotNull("First message not received", receivedFirst);
- Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
- Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
-
- Assert.assertNotNull("Final message not received", receivedSecond);
- Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
- Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
-
- clientConnection.close();
-
- producerConnection.close();
- }
-
- private Message nextMessage(String msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
- {
- Message send = producerSession.createTextMessage("Message " + msg);
- send.setBooleanProperty("first", first);
- send.setStringProperty("testprop", "TimeToLiveTest");
- send.setLongProperty("TTL", producer.getTimeToLive());
- return send;
- }
-
-
- /**
- * Tests the expired messages get actively deleted even on queues which have no consumers
- * @throws Exception
- */
- public void testActiveTTL() throws Exception
- {
- Connection producerConnection = getConnection();
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = createTestQueue(producerSession);
-
- MessageProducer producer = producerSession.createProducer(queue);
- producer.setTimeToLive(1000L);
-
- // send Messages
- for(int i = 0; i < MSG_COUNT; i++)
- {
- producer.send(producerSession.createTextMessage("Message: "+i));
- }
- long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
-
- // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
- long messageCount = MSG_COUNT;
- long lastPass;
-
- do
- {
- lastPass = messageCount;
- Thread.sleep(100);
- producerConnection.start();
- messageCount = getQueueDepth(producerConnection, queue);
-
- // If we have received messages in the last loop then extend the timeout time.
- // if we get messages stuck that are not expiring then the failureTime will occur
- // failing the test. This will help with the scenario when the broker does not
- // have enough CPU cycles to process the TTLs.
- if (lastPass != messageCount)
- {
- failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
- }
- }
- while(messageCount > 0L && System.currentTimeMillis() < failureTime);
-
- assertEquals("Messages not automatically expired: ", 0L, messageCount);
-
- producer.close();
- producerSession.close();
- producerConnection.close();
- }
-
- public void testPassiveTTLwithDurableSubscription() throws Exception
- {
- //Create Client 1
- TopicConnection clientConnection = (TopicConnection) getConnection();
-
- Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Create and close the durable subscriber
- Topic topic = createTopic(clientConnection, getTestQueueName());
- TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
- durableSubscriber.close();
-
- //Create Producer
- Connection producerConnection = getConnection();
-
- producerConnection.start();
-
- // Move to a Transacted session to ensure that all messages have been delivered to broker before
- // we start waiting for TTL
- Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageProducer producer = producerSession.createProducer(topic);
-
- //Set TTL
- int msg = 0;
- producer.send(nextMessage(String.valueOf(msg), true, producerSession, producer));
-
- producer.setTimeToLive(TIME_TO_LIVE);
-
- for (; msg < MSG_COUNT - 2; msg++)
- {
- producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
- }
-
- //Reset TTL
- producer.setTimeToLive(0L);
- producer.send(nextMessage(String.valueOf(msg), false, producerSession, producer));
-
- producerSession.commit();
-
- //resubscribe
- durableSubscriber = clientSession.createDurableSubscriber(topic, getTestQueueName(),"testprop='TimeToLiveTest'", false);
-
- // Ensure we sleep the required amount of time.
- ReentrantLock waitLock = new ReentrantLock();
- Condition wait = waitLock.newCondition();
- final long MILLIS = 1000000L;
-
- long waitTime = TIME_TO_LIVE * MILLIS;
- while (waitTime > 0)
- {
- try
- {
- waitLock.lock();
-
- waitTime = wait.awaitNanos(waitTime);
- }
- catch (InterruptedException e)
- {
- //Stop if we are interrupted
- fail(e.getMessage());
- }
- finally
- {
- waitLock.unlock();
- }
-
- }
-
- clientConnection.start();
-
- Message receivedFirst = durableSubscriber.receive(_longReceiveTimeout);
- Message receivedSecond = durableSubscriber.receive(_longReceiveTimeout);
- Message receivedThird = durableSubscriber.receive(getShortReceiveTimeout());
-
- // Log the messages to help diagnosis incase of failure
- LOGGER.info("First:"+receivedFirst);
- LOGGER.info("Second:"+receivedSecond);
- LOGGER.info("Third:"+receivedThird);
-
- // Only first and last messages sent should survive expiry
- Assert.assertNull("More messages received", receivedThird);
-
- Assert.assertNotNull("First message not received", receivedFirst);
- Assert.assertTrue("First message doesn't have first set.", receivedFirst.getBooleanProperty("first"));
- Assert.assertEquals("First message has incorrect TTL.", 0L, receivedFirst.getLongProperty("TTL"));
-
- Assert.assertNotNull("Final message not received", receivedSecond);
- Assert.assertFalse("Final message has first set.", receivedSecond.getBooleanProperty("first"));
- Assert.assertEquals("Final message has incorrect TTL.", 0L, receivedSecond.getLongProperty("TTL"));
-
- durableSubscriber.close();
- clientSession.unsubscribe(getTestQueueName());
- clientConnection.close();
-
- producerConnection.close();
- }
-
- public void testActiveTTLwithDurableSubscription() throws Exception
- {
- //Create Client 1
- TopicConnection clientConnection = (TopicConnection) getConnectionBuilder().setClientId("clientid").build();
- Session clientSession = clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Create and close the durable subscriber
- Topic topic = createTopic(clientConnection, getTestQueueName());
- TopicSubscriber durableSubscriber = clientSession.createDurableSubscriber(topic, "MyDurableTTLSubscription","testprop='TimeToLiveTest'", false);
- durableSubscriber.close();
-
- //Create Producer
- Connection producerConnection = getConnection();
- producerConnection.start();
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = producerSession.createProducer(topic);
- producer.setTimeToLive(1000L);
-
- // send Messages
- for(int i = 0; i < MSG_COUNT; i++)
- {
- producer.send(producerSession.createTextMessage("Message: "+i));
- }
- long failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
-
- // check Queue depth for up to TIMEOUT seconds after the Queue Depth hasn't changed for 100ms.
- long messageCount = MSG_COUNT;
- long lastPass;
- Queue subcriptionQueue = isBroker10() ? producerSession.createQueue("qpidsub_/clientid_/MyDurableTTLSubscription_/durable") : new AMQQueue("amq.topic","clientid" + ":" + "MyDurableTTLSubscription");
- do
- {
- lastPass = messageCount;
- Thread.sleep(100);
- messageCount = getQueueDepth(producerConnection, subcriptionQueue);
-
- // If we have received messages in the last loop then extend the timeout time.
- // if we get messages stuck that are not expiring then the failureTime will occur
- // failing the test. This will help with the scenario when the broker does not
- // have enough CPU cycles to process the TTLs.
- if (lastPass != messageCount)
- {
- failureTime = System.currentTimeMillis() + 2 * SERVER_TTL_TIMEOUT;
- }
- }
- while(messageCount > 0L && System.currentTimeMillis() < failureTime);
-
- assertEquals("Messages not automatically expired: ", 0L, messageCount);
-
- producer.close();
- producerSession.close();
- producerConnection.close();
-
- clientSession.unsubscribe("MyDurableTTLSubscription");
- clientSession.close();
- clientConnection.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/377315ba/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 38b9091..ca9943a 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -60,10 +60,6 @@ org.apache.qpid.server.exchange.ReturnUnroutableMandatoryMessageTest#*
org.apache.qpid.test.unit.topic.DurableSubscriptionTest#testUnsubscribeWhenUsingSelectorMakesTopicUnreachable
org.apache.qpid.test.unit.client.connection.ExceptionListenerTest#testExceptionListenerConnectionStopDeadlock
-// c++ broker expires messages on delivery or when the queue cleaner thread runs.
-org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTL
-org.apache.qpid.server.queue.TimeToLiveTest#testActiveTTLwithDurableSubscription
-
// QPID-1727 , QPID-1726 :c++ broker does not support flow to disk on transient queues. Also it requries a persistent store impl. for Apache
org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org