You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/12/28 21:48:33 UTC
[1/3] qpid-broker-j git commit: QPID-6933: [System Tests] Refactor
last value queue tests as JMS 1.1 system test
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 40a7fdbe4 -> b1a7aacc4
QPID-6933: [System Tests] Refactor last value queue tests as JMS 1.1 system test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b1a7aacc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b1a7aacc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b1a7aacc
Branch: refs/heads/master
Commit: b1a7aacc4c61ff07a9da6d6c2ea4db3ee41e016d
Parents: b4e6fcf
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Dec 28 21:43:51 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Dec 28 21:48:24 2017 +0000
----------------------------------------------------------------------
.../extensions/queue/LastValueQueueTest.java | 598 +++++++++++++++++++
.../qpid/server/queue/LastValueQueueTest.java | 573 ------------------
test-profiles/CPPExcludes | 1 -
3 files changed, 598 insertions(+), 574 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b1a7aacc/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
new file mode 100644
index 0000000..1313417
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/LastValueQueueTest.java
@@ -0,0 +1,598 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.jms_1_1.extensions.queue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.naming.NamingException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.queue.LastValueQueue;
+import org.apache.qpid.systests.JmsTestBase;
+
+public class LastValueQueueTest extends JmsTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueTest.class);
+
+ private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
+ private static final String KEY_PROPERTY = "key";
+
+ private static final int MSG_COUNT = 400;
+
+ @Test
+ public void testConflation() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, false);
+ Queue queue = createQueue(queueName);
+
+ sendMessages(queue, 0, MSG_COUNT);
+
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ List<Message> messages = new ArrayList<>();
+ Message received;
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received", 10, messages.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received",
+ MSG_COUNT - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testConflationWithRelease() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, false);
+ Queue queue = createQueue(queueName);
+
+ sendMessages(queue, 0, MSG_COUNT / 2);
+
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message received;
+ List<Message> messages = new ArrayList<>();
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received", 10, messages.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received",
+ MSG_COUNT / 2 - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+
+ sendMessages(queue, MSG_COUNT / 2, MSG_COUNT);
+
+ consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message received;
+ List<Message> messages = new ArrayList<>();
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received", 10, messages.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received",
+ MSG_COUNT - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testConflationWithReleaseAfterNewPublish() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, false);
+ Queue queue = createQueue(queueName);
+
+ sendMessages(queue, 0, MSG_COUNT / 2);
+
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message received;
+ List<Message> messages = new ArrayList<>();
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received", 10, messages.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received",
+ MSG_COUNT / 2 - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ consumer.close();
+
+ sendMessages(queue, MSG_COUNT / 2, MSG_COUNT);
+
+ consumerSession.close();
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+
+ consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message received;
+ List<Message> messages = new ArrayList<>();
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received", 10, messages.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received",
+ MSG_COUNT - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testConflatedQueueDepth() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, false);
+ Queue queue = createQueue(queueName);
+
+ sendMessages(queue, 0, MSG_COUNT);
+
+ final long queueDepth = getTotalDepthOfQueuesMessages();
+
+ assertEquals(10, queueDepth);
+ }
+
+ @Test
+ public void testConflationBrowser() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, true);
+ Queue queue = createQueue(queueName);
+
+ sendMessages(queue, 0, MSG_COUNT);
+
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+ Message received;
+ List<Message> messages = new ArrayList<>();
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+
+ assertEquals("Unexpected number of messages received", 10, messages.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received",
+ MSG_COUNT - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+
+ messages.clear();
+
+ sendMessages(queue, MSG_COUNT, MSG_COUNT + 1);
+
+ while ((received = consumer.receive(getReceiveTimeout())) != null)
+ {
+ messages.add(received);
+ }
+ assertEquals("Unexpected number of messages received", 1, messages.size());
+ assertEquals("Unexpected message number received",
+ MSG_COUNT,
+ messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testConflation2Browsers() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, true);
+ Queue queue = createQueue(queueName);
+
+ sendMessages(queue, 0, MSG_COUNT);
+
+ Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(queue);
+ MessageConsumer consumer2 = consumerSession.createConsumer(queue);
+
+ consumerConnection.start();
+ List<Message> messages = new ArrayList<>();
+ List<Message> messages2 = new ArrayList<>();
+ Message received = consumer.receive(getReceiveTimeout());
+ Message received2 = consumer2.receive(getReceiveTimeout());
+
+ while (received != null || received2 != null)
+ {
+ if (received != null)
+ {
+ messages.add(received);
+ }
+ if (received2 != null)
+ {
+ messages2.add(received2);
+ }
+
+ received = consumer.receive(getReceiveTimeout());
+ received2 = consumer2.receive(getReceiveTimeout());
+ }
+
+ assertEquals("Unexpected number of messages received on first browser", 10, messages.size());
+ assertEquals("Unexpected number of messages received on second browser", 10, messages2.size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message msg = messages.get(i);
+ assertEquals("Unexpected message number received on first browser",
+ MSG_COUNT - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ msg = messages2.get(i);
+ assertEquals("Unexpected message number received on second browser",
+ MSG_COUNT - 10 + i,
+ msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
+ }
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+
+ @Test
+ public void testParallelProductionAndConsumption() throws Exception
+ {
+ String queueName = getTestName();
+ createConflationQueue(queueName, KEY_PROPERTY, true);
+ Queue queue = createQueue(queueName);
+
+ // Start producing threads that send messages
+ BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1", queue);
+ messageProducer1.startSendingMessages();
+ BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2", queue);
+ messageProducer2.startSendingMessages();
+
+ Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1, queue);
+
+ messageProducer1.join();
+ messageProducer2.join();
+
+ final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
+ assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size());
+ final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
+ assertEquals(lastSentMessages1, lastSentMessages2);
+
+ assertEquals("The last message sent for each key should match the last message received for that key",
+ lastSentMessages1, lastReceivedMessages);
+
+ assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
+ }
+
+ private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer, final Queue queue) throws Exception
+ {
+ producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
+
+ Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>();
+ Connection consumerConnection = getConnectionBuilder().setPrefetch(1).build();
+ try
+ {
+
+ Session _consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ LOGGER.info("Starting to receive");
+
+ MessageConsumer _consumer = _consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ Message message;
+ int numberOfShutdownsReceived = 0;
+ int numberOfMessagesReceived = 0;
+ while (numberOfShutdownsReceived < 2)
+ {
+ message = _consumer.receive(getReceiveTimeout());
+ assertNotNull("null received after "
+ + numberOfMessagesReceived
+ + " messages and "
+ + numberOfShutdownsReceived
+ + " shutdowns", message);
+
+ if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
+ {
+ numberOfShutdownsReceived++;
+ }
+ else
+ {
+ numberOfMessagesReceived++;
+ putMessageInMap(message, messageSequenceNumbersByKey);
+ }
+ }
+
+ LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ return messageSequenceNumbersByKey;
+ }
+
+ private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
+ {
+ String keyValue = message.getStringProperty(KEY_PROPERTY);
+ Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
+ messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
+ }
+
+ private class BackgroundMessageProducer
+ {
+ static final String SHUTDOWN = "SHUTDOWN";
+
+ private final String _threadName;
+ private final Queue _queue;
+
+ private volatile Exception _exception;
+
+ private Thread _thread;
+ private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>();
+ private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT / 4);
+
+ BackgroundMessageProducer(String threadName, Queue queue)
+ {
+ _threadName = threadName;
+ _queue = queue;
+ }
+
+ void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
+ {
+ final long latchTimeout = 60000;
+ boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
+ assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.",
+ success);
+ LOGGER.info("Quarter of messages sent");
+ }
+
+ public Exception getException()
+ {
+ return _exception;
+ }
+
+ Map<String, Integer> getMessageSequenceNumbersByKey()
+ {
+ return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
+ }
+
+ void startSendingMessages()
+ {
+ Runnable messageSender = () -> {
+ try
+ {
+ LOGGER.info("Starting to send in background thread");
+ Connection producerConnection = getConnection();
+ try
+ {
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer backgroundProducer = producerSession.createProducer(_queue);
+ for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
+ {
+ Message message = nextMessage(messageNumber, producerSession, 2);
+ backgroundProducer.send(message);
+
+ putMessageInMap(message, _messageSequenceNumbersByKey);
+ _quarterOfMessagesSentLatch.countDown();
+ }
+
+ Message shutdownMessage = producerSession.createMessage();
+ shutdownMessage.setBooleanProperty(SHUTDOWN, true);
+ // make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will
+ // otherwise consider them to have the same key.
+ shutdownMessage.setStringProperty(KEY_PROPERTY, _threadName);
+
+ backgroundProducer.send(shutdownMessage);
+ }
+ finally
+ {
+ producerConnection.close();
+ }
+
+ LOGGER.info("Finished sending in background thread");
+ }
+ catch (Exception e)
+ {
+ _exception = e;
+ }
+ };
+
+ _thread = new Thread(messageSender);
+ _thread.setName(_threadName);
+ _thread.start();
+ }
+
+ void join() throws InterruptedException
+ {
+ final int timeoutInMillis = 120000;
+ _thread.join(timeoutInMillis);
+ assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive());
+ }
+ }
+
+ private void createConflationQueue(final String queueName,
+ final String keyProperty, final boolean enforceBrowseOnly) throws Exception
+ {
+ final Map<String, Object> arguments = new HashMap<>();
+ arguments.put(LastValueQueue.LVQ_KEY, keyProperty);
+ if (enforceBrowseOnly)
+ {
+ arguments.put("ensureNondestructiveConsumers", true);
+ }
+ createEntityUsingAmqpManagement(queueName, "org.apache.qpid.LastValueQueue", arguments);
+ }
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
+ {
+ return nextMessage(msg, producerSession, 10);
+ }
+
+ private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
+ {
+ Message send = producerSession.createTextMessage("Message: " + msg);
+
+ final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues);
+ send.setStringProperty(KEY_PROPERTY, keyValue);
+ send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
+
+ return send;
+ }
+
+ private void sendMessages(final Queue queue, final int fromIndex, final int toIndex)
+ throws JMSException, NamingException
+ {
+ Connection producerConnection = getConnection();
+ try
+ {
+ Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageProducer producer = producerSession.createProducer(queue);
+
+ for (int msg = fromIndex; msg < toIndex; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ producerSession.commit();
+ }
+
+ producer.close();
+ producerSession.close();
+ }
+ finally
+ {
+ producerConnection.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b1a7aacc/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java b/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
deleted file mode 100644
index 6d22adf..0000000
--- a/systests/src/test/java/org/apache/qpid/server/queue/LastValueQueueTest.java
+++ /dev/null
@@ -1,573 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.queue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class LastValueQueueTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueTest.class);
-
- private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
- private static final String KEY_PROPERTY = "key";
-
- private static final int MSG_COUNT = 400;
-
- private String _queueName;
- private Queue _queue;
- private Connection _producerConnection;
- private MessageProducer _producer;
- private Session _producerSession;
- private Connection _consumerConnection;
- private Session _consumerSession;
- private MessageConsumer _consumer;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- _queueName = getTestQueueName();
- _producerConnection = getConnection();
- _producerSession = _producerConnection.createSession(true, Session.SESSION_TRANSACTED);
- }
-
- public void testConflation() throws Exception
- {
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- createConflationQueue(_producerSession, false);
- _producer = _producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
-
- _producer.close();
- _producerSession.close();
- _producerConnection.close();
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
- Message received;
-
- List<Message> messages = new ArrayList<>();
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
-
- assertEquals("Unexpected number of messages received",10,messages.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
- }
-
- public void testConflationWithRelease() throws Exception
- {
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-
- createConflationQueue(_producerSession, false);
- _producer = _producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT/2; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
- Message received;
- List<Message> messages = new ArrayList<>();
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
-
- assertEquals("Unexpected number of messages received",10,messages.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
-
- _consumerSession.close();
- _consumerConnection.close();
-
-
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
-
- messages = new ArrayList<>();
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
-
- assertEquals("Unexpected number of messages received",10,messages.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
-
- }
-
-
- public void testConflationWithReleaseAfterNewPublish() throws Exception
- {
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-
- createConflationQueue(_producerSession, false);
- _producer = _producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT/2; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
- Message received;
- List<Message> messages = new ArrayList<>();
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
-
- assertEquals("Unexpected number of messages received",10,messages.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT/2 - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
-
- _consumer.close();
-
- for (int msg = MSG_COUNT/2; msg < MSG_COUNT; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- }
- _producerSession.commit();
-
- // this causes the "old" messages to be released
- _consumerSession.close();
- _consumerConnection.close();
-
-
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
-
- messages = new ArrayList<>();
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
-
- assertEquals("Unexpected number of messages received",10,messages.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
-
- }
-
- public void testConflatedQueueDepth() throws Exception
- {
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- createConflationQueue(_producerSession, false);
- _producer = _producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
- _producerConnection.start();
- final long queueDepth = getQueueDepth(_producerConnection,_queue);
-
- assertEquals(10, queueDepth);
- }
-
- public void testConflationBrowser() throws Exception
- {
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- createConflationQueue(_producerSession, true);
- _producer = _producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
- Message received;
- List<Message> messages = new ArrayList<>();
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
-
- assertEquals("Unexpected number of messages received",10,messages.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
-
- messages.clear();
-
- _producer.send(nextMessage(MSG_COUNT, _producerSession));
- _producerSession.commit();
-
- while((received = _consumer.receive(getReceiveTimeout())) != null)
- {
- messages.add(received);
- }
- assertEquals("Unexpected number of messages received",1,messages.size());
- assertEquals("Unexpected message number received", MSG_COUNT, messages.get(0).getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
-
-
- _producer.close();
- _producerSession.close();
- _producerConnection.close();
- }
-
- public void testConflation2Browsers() throws Exception
- {
- _consumerConnection = getConnection();
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- createConflationQueue(_producerSession, true);
- _producer = _producerSession.createProducer(_queue);
-
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- _producer.send(nextMessage(msg, _producerSession));
- _producerSession.commit();
- }
-
- _consumer = _consumerSession.createConsumer(_queue);
- MessageConsumer consumer2 = _consumerSession.createConsumer(_queue);
-
- _consumerConnection.start();
- List<Message> messages = new ArrayList<>();
- List<Message> messages2 = new ArrayList<>();
- Message received = _consumer.receive(getReceiveTimeout());
- Message received2 = consumer2.receive(getReceiveTimeout());
-
- while(received!=null || received2!=null)
- {
- if(received != null)
- {
- messages.add(received);
- }
- if(received2 != null)
- {
- messages2.add(received2);
- }
-
-
- received = _consumer.receive(getReceiveTimeout());
- received2 = consumer2.receive(getReceiveTimeout());
-
- }
-
- assertEquals("Unexpected number of messages received on first browser",10,messages.size());
- assertEquals("Unexpected number of messages received on second browser",10,messages2.size());
-
- for(int i = 0 ; i < 10; i++)
- {
- Message msg = messages.get(i);
- assertEquals("Unexpected message number received on first browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- msg = messages2.get(i);
- assertEquals("Unexpected message number received on second browser", MSG_COUNT - 10 + i, msg.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
- }
-
-
- _producer.close();
- _producerSession.close();
- _producerConnection.close();
- }
-
- public void testParallelProductionAndConsumption() throws Exception
- {
- createConflationQueue(_producerSession, false);
-
- // Start producing threads that send messages
- BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer("Message sender1");
- messageProducer1.startSendingMessages();
- BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer("Message sender2");
- messageProducer2.startSendingMessages();
-
- Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1);
-
- messageProducer1.join();
- messageProducer2.join();
-
- final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
- assertEquals("Unexpected number of last sent messages sent by producer1", 2, lastSentMessages1.size());
- final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
- assertEquals(lastSentMessages1, lastSentMessages2);
-
- assertEquals("The last message sent for each key should match the last message received for that key",
- lastSentMessages1, lastReceivedMessages);
-
- assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
- }
-
- private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer) throws Exception
- {
- producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
-
- _consumerConnection = getConnectionWithPrefetch(1);
-
- _consumerSession = _consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- LOGGER.info("Starting to receive");
-
- _consumer = _consumerSession.createConsumer(_queue);
- _consumerConnection.start();
-
- Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>();
-
- Message message;
- int numberOfShutdownsReceived = 0;
- int numberOfMessagesReceived = 0;
- while(numberOfShutdownsReceived < 2)
- {
- message = _consumer.receive(getReceiveTimeout());
- assertNotNull("null received after " + numberOfMessagesReceived + " messages and " + numberOfShutdownsReceived + " shutdowns", message);
-
- if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
- {
- numberOfShutdownsReceived++;
- }
- else
- {
- numberOfMessagesReceived++;
- putMessageInMap(message, messageSequenceNumbersByKey);
- }
- }
-
- LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
-
- return messageSequenceNumbersByKey;
- }
-
- private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
- {
- String keyValue = message.getStringProperty(KEY_PROPERTY);
- Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
- messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
- }
-
- private class BackgroundMessageProducer
- {
- static final String SHUTDOWN = "SHUTDOWN";
-
- private final String _threadName;
-
- private volatile Exception _exception;
-
- private Thread _thread;
- private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>();
- private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT/4);
-
- public BackgroundMessageProducer(String threadName)
- {
- _threadName = threadName;
- }
-
- public void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
- {
- final long latchTimeout = 60000;
- boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
- assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.", success);
- LOGGER.info("Quarter of messages sent");
- }
-
- public Exception getException()
- {
- return _exception;
- }
-
- public Map<String, Integer> getMessageSequenceNumbersByKey()
- {
- return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
- }
-
- public void startSendingMessages()
- {
- Runnable messageSender = new Runnable()
- {
- @Override
- public void run()
- {
- try
- {
- LOGGER.info("Starting to send in background thread");
- Connection producerConnection = getConnection();
- Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer backgroundProducer = producerSession.createProducer(_queue);
- for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
- {
- Message message = nextMessage(messageNumber, producerSession, 2);
- backgroundProducer.send(message);
-
- putMessageInMap(message, _messageSequenceNumbersByKey);
- _quarterOfMessagesSentLatch.countDown();
- }
-
- Message shutdownMessage = producerSession.createMessage();
- shutdownMessage.setBooleanProperty(SHUTDOWN, true);
- // make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will
- // otherwise consider them to have the same key.
- shutdownMessage.setStringProperty(KEY_PROPERTY, _threadName);
-
- backgroundProducer.send(shutdownMessage);
-
- LOGGER.info("Finished sending in background thread");
- }
- catch (Exception e)
- {
- _exception = e;
- throw new RuntimeException(e);
- }
- }
- };
-
- _thread = new Thread(messageSender);
- _thread.setName(_threadName);
- _thread.start();
- }
-
- public void join() throws InterruptedException
- {
- final int timeoutInMillis = 120000;
- _thread.join(timeoutInMillis);
- assertFalse("Expected producer thread to finish within " + timeoutInMillis + "ms", _thread.isAlive());
- }
- }
-
- private void createConflationQueue(Session session, final boolean enforceBrowseOnly) throws QpidException, JMSException
- {
- if(isBroker10())
- {
- final Map<String, Object> arguments = new HashMap<>();
- arguments.put(LastValueQueue.LVQ_KEY, KEY_PROPERTY);
- if(enforceBrowseOnly)
- {
- arguments.put("ensureNondestructiveConsumers", true);
- }
- createEntityUsingAmqpManagement(_queueName, session, "org.apache.qpid.LastValueQueue", arguments);
- _queue = session.createQueue(_queueName);
- }
- else
- {
- String browserOnly = enforceBrowseOnly ? "mode: browse," : "";
- String addr = String.format("ADDR:%s; {create: always, %s" +
- "node: {x-declare:{arguments : {'qpid.last_value_queue_key':'%s'}}}}",
- _queueName, browserOnly, KEY_PROPERTY);
-
- _queue = session.createQueue(addr);
- session.createConsumer(_queue).close();
- }
- }
-
- private Message nextMessage(int msg, Session producerSession) throws JMSException
- {
- return nextMessage(msg, producerSession, 10);
- }
-
- private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
- {
- Message send = producerSession.createTextMessage("Message: " + msg);
-
- final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues);
- send.setStringProperty(KEY_PROPERTY, keyValue);
- send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
-
- return send;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b1a7aacc/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 054e5c9..9a249f2 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -190,7 +190,6 @@ org.apache.qpid.server.queue.FlowToDiskTest#*
# Tests require AMQP management
org.apache.qpid.server.routing.AlternateBindingRoutingTest#*
-org.apache.qpid.server.queue.LastValueQueueTest#testConflatedQueueDepth
org.apache.qpid.server.queue.QueueDepthWithSelectorTest#test
org.apache.qpid.test.unit.message.UTF8Test#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-6933: [System Tests] Refactor
nondestructive consumer tests as JMS 1.1 system test
Posted by or...@apache.org.
QPID-6933: [System Tests] Refactor nondestructive consumer tests as JMS 1.1 system test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b4e6fcf5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b4e6fcf5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b4e6fcf5
Branch: refs/heads/master
Commit: b4e6fcf55854076fce74360dadbd1f366e5a2b4f
Parents: e3f8e59
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Dec 28 20:22:46 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Dec 28 21:48:24 2017 +0000
----------------------------------------------------------------------
.../EnsureNondestructiveConsumersTest.java | 102 +++++++++++++++
.../EnsureNondestructiveConsumersTest.java | 128 -------------------
test-profiles/CPPExcludes | 1 -
3 files changed, 102 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e6fcf5/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/nondestructiveconsumer/EnsureNondestructiveConsumersTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/nondestructiveconsumer/EnsureNondestructiveConsumersTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/nondestructiveconsumer/EnsureNondestructiveConsumersTest.java
new file mode 100644
index 0000000..db02103
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/nondestructiveconsumer/EnsureNondestructiveConsumersTest.java
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.extensions.nondestructiveconsumer;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class EnsureNondestructiveConsumersTest extends JmsTestBase
+{
+
+ @Test
+ public void testEnsureNondestructiveConsumers() throws Exception
+ {
+ String queueName = getTestName();
+ createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue",
+ Collections.singletonMap("ensureNondestructiveConsumers", true));
+ Queue queue = createQueue(queueName);
+ int numberOfMessages = 5;
+ Connection connection = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+
+ Utils.sendMessages(session, queue, numberOfMessages);
+
+ MessageConsumer consumer1 = session.createConsumer(queue);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ Message receivedMsg = consumer1.receive(getReceiveTimeout());
+ assertNotNull("Message " + i + " not received", receivedMsg);
+ assertEquals("Unexpected message", i, receivedMsg.getIntProperty(INDEX));
+ }
+
+ assertNull("Unexpected message arrived", consumer1.receive(getShortReceiveTimeout()));
+
+ MessageConsumer consumer2 = session.createConsumer(queue);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ Message receivedMsg = consumer2.receive(getReceiveTimeout());
+ assertNotNull("Message " + i + " not received", receivedMsg);
+ assertEquals("Unexpected message", i, receivedMsg.getIntProperty(INDEX));
+ }
+
+ assertNull("Unexpected message arrived", consumer2.receive(getShortReceiveTimeout()));
+
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(Utils.createNextMessage(session, 6));
+
+ assertNotNull("Message not received on first consumer", consumer1.receive(getReceiveTimeout()));
+ assertNotNull("Message not received on second consumer", consumer2.receive(getReceiveTimeout()));
+
+ assertNull("Unexpected message arrived", consumer1.receive(getShortReceiveTimeout()));
+ assertNull("Unexpected message arrived", consumer2.receive(getShortReceiveTimeout()));
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ private long getShortReceiveTimeout()
+ {
+ return getReceiveTimeout() / 4;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e6fcf5/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java b/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java
deleted file mode 100644
index 1a85dc4..0000000
--- a/systests/src/test/java/org/apache/qpid/server/queue/EnsureNondestructiveConsumersTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.queue;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.qpid.QpidException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class EnsureNondestructiveConsumersTest extends QpidBrokerTestCase
-{
-
- private String _queueName;
- private Connection _connection;
- private Session _session;
- private Queue _queue;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
-
- _queueName = getTestQueueName();
- _connection = getConnectionWithSyncPublishing();
- _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- _connection.start();
- }
-
- private void createQueueEnsureNondestructiveConsumerOption(boolean ensureNonDestructiveConsumer)
- throws QpidException, JMSException
- {
- final Map<String,Object> arguments = new HashMap<>();
- if(isBroker10())
- {
- if(ensureNonDestructiveConsumer)
- {
- arguments.put("ensureNondestructiveConsumers", true);
- }
- createEntityUsingAmqpManagement(_queueName, _session, "org.apache.qpid.Queue", arguments);
- _queue = _session.createQueue(_queueName);
- }
- else
- {
- arguments.put("qpid.ensure_nondestructive_consumers", String.valueOf(ensureNonDestructiveConsumer));
- ((AMQSession<?, ?>) _session).createQueue(_queueName, false, true, false, arguments);
- _queue = new org.apache.qpid.client.AMQQueue("amq.direct", _queueName);
- ((AMQSession<?, ?>) _session).declareAndBind((AMQDestination) _queue);
- }
- }
-
- public void testEnsureNondestructiveConsumers() throws QpidException, JMSException
- {
- createQueueEnsureNondestructiveConsumerOption(true);
- final MessageProducer prod = _session.createProducer(_queue);
- TextMessage textMessage;
-
- for(int i = 0; i < 5; i++)
- {
- textMessage = _session.createTextMessage("hello");
- textMessage.setIntProperty("msgID", i);
- prod.send(textMessage);
- }
-
- MessageConsumer cons1 = _session.createConsumer(_queue);
-
- for(int i = 0; i < 5 ; i++)
- {
- Message receivedMsg = cons1.receive(500);
- assertNotNull("Message "+i+" not received", receivedMsg);
- assertEquals("Unexpected message", i, receivedMsg.getIntProperty("msgID"));
- }
-
- assertNull("Unexpected message arrived", cons1.receive(500));
-
- MessageConsumer cons2 = _session.createConsumer(_queue);
-
- for(int i = 0; i < 5 ; i++)
- {
- Message receivedMsg = cons2.receive(500);
- assertNotNull("Message "+i+" not received", receivedMsg);
- assertEquals("Unexpected message", i, receivedMsg.getIntProperty("msgID"));
- }
-
- assertNull("Unexpected message arrived", cons2.receive(500));
-
- textMessage = _session.createTextMessage("hello");
- textMessage.setIntProperty("msgID", 6);
- prod.send(textMessage);
-
- assertNotNull("Message not received on first consumer", cons1.receive(500));
- assertNotNull("Message not received on second consumer", cons2.receive(500));
-
- assertNull("Unexpected message arrived", cons1.receive(500));
- assertNull("Unexpected message arrived", cons2.receive(500));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b4e6fcf5/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 966aebd..054e5c9 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -161,7 +161,6 @@ org.apache.qpid.systest.MessageCompressionTest#*
org.apache.qpid.test.unit.client.AMQSessionTest#testQueueDepthForQueueThatDoesNotExistLegacyBehaviour_08_091
-org.apache.qpid.server.queue.EnsureNondestructiveConsumersTest#*
org.apache.qpid.server.protocol.v0_8.*
//Qpid Broker-J BDB System Tests
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-6933: [System Tests] Add durable
subscriber test and remove redundant durable subscriber tests
Posted by or...@apache.org.
QPID-6933: [System Tests] Add durable subscriber test and remove redundant durable subscriber tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e3f8e59a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e3f8e59a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e3f8e59a
Branch: refs/heads/master
Commit: e3f8e59addf2eddda0ead475b4df18f87fe269e3
Parents: 40a7fdb
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Dec 28 19:56:19 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Dec 28 21:48:24 2017 +0000
----------------------------------------------------------------------
.../jms_1_1/topic/DurableSubscribtionTest.java | 62 +++++++
.../persistent/NoLocalAfterRecoveryTest.java | 185 -------------------
test-profiles/CPPExcludes | 3 -
test-profiles/JavaTransientExcludes | 1 -
4 files changed, 62 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e3f8e59a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
index c759207..1a5f163 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/topic/DurableSubscribtionTest.java
@@ -463,6 +463,68 @@ public class DurableSubscribtionTest extends JmsTestBase
}
}
+
+ @Test
+ public void testResubscribeWithChangedNoLocal() throws Exception
+ {
+ String subscriptionName = getTestName() + "_sub";
+ Topic topic = createTopic(getTestName());
+ String clientId = "testClientId";
+ Connection connection = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber durableSubscriber =
+ session.createDurableSubscriber(topic, subscriptionName, null, false);
+
+ MessageProducer producer = session.createProducer(topic);
+ producer.send(session.createTextMessage("A"));
+ producer.send(session.createTextMessage("B"));
+ session.commit();
+
+ connection.start();
+
+ Message receivedMessage = durableSubscriber.receive(getReceiveTimeout());
+ assertTrue("TextMessage should be received", receivedMessage instanceof TextMessage);
+ assertEquals("Unexpected message received", "A", ((TextMessage)receivedMessage).getText());
+
+ session.commit();
+ }
+ finally
+ {
+ connection.close();
+ }
+
+ connection = getConnectionBuilder().setClientId(clientId).build();
+ try
+ {
+ connection.start();
+
+ Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+ TopicSubscriber noLocalSubscriber2 = session2.createDurableSubscriber(topic, subscriptionName, null, true);
+
+ Connection secondConnection = getConnectionBuilder().setClientId("secondConnection").build();
+ try
+ {
+ Session secondSession = secondConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer secondProducer = secondSession.createProducer(topic);
+ secondProducer.send(secondSession.createTextMessage("C"));
+ }
+ finally
+ {
+ secondConnection.close();
+ }
+
+ Message noLocalSubscriberMessage = noLocalSubscriber2.receive(getReceiveTimeout());
+ assertTrue("TextMessage should be received", noLocalSubscriberMessage instanceof TextMessage);
+ assertEquals("Unexpected message received", "C", ((TextMessage)noLocalSubscriberMessage).getText());
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
/**
* create and register a durable subscriber with a message selector and then close it
* crash the broker
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e3f8e59a/systests/src/test/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java b/systests/src/test/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
deleted file mode 100644
index 35cc271..0000000
--- a/systests/src/test/java/org/apache/qpid/server/persistent/NoLocalAfterRecoveryTest.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.persistent;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-/**
- * Verifies that after recovery, a new Connection with no-local in use is
- * able to receive messages sent prior to the broker restart.
- */
-public class NoLocalAfterRecoveryTest extends QpidBrokerTestCase
-{
- protected final String MY_TOPIC_SUBSCRIPTION_NAME = getTestQueueName();
- protected static final int SEND_COUNT = 10;
-
- public void testNoLocalNotQueued() throws Exception
- {
- if(!isBrokerStorePersistent())
- {
- fail("This test requires a broker with a persistent store");
- }
-
- Connection connection = getConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic = createTopic(connection, MY_TOPIC_SUBSCRIPTION_NAME);
-
- TopicSubscriber noLocalSubscriber = session.
- createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
- null, true);
-
- TopicSubscriber normalSubscriber = session.
- createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-Normal",
- null, false);
-
- sendMessage(session, topic, SEND_COUNT);
-
- // Check messages can be received as expected.
- connection.start();
-
- //As the no-local subscriber was on the same connection the messages were
- //published on, tit will receive no messages as they will be discarded on the broker
- List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
- assertEquals("No Local Subscriber Received messages", 0, received.size());
-
- received = receiveMessage(normalSubscriber, SEND_COUNT);
- assertEquals("Normal Subscriber Received no messages",
- SEND_COUNT, received.size());
- session.commit();
-
- normalSubscriber.close();
- connection.close();
-
- //Ensure the no-local subscribers messages were discarded by restarting the broker
- //and reconnecting to the subscription to ensure they were not recovered.
- restartDefaultBroker();
-
- Connection connection2 = getConnection();
- connection2.start();
-
- Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic2 = session2.createTopic(MY_TOPIC_SUBSCRIPTION_NAME);
-
- TopicSubscriber noLocalSubscriber2 = session2.
- createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",
- null, true);
-
- // The NO-local subscriber should not get any messages
- received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
- session2.commit();
- assertEquals("No Local Subscriber Received messages", 0, received.size());
-
- noLocalSubscriber2.close();
-
-
- }
-
-
- public void testNonNoLocalQueued() throws Exception
- {
- if(!isBrokerStorePersistent())
- {
- fail("This test requires a broker with a persistent store");
- }
-
- Connection connection = getConnectionBuilder().setClientId("testClientId").build();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic = createTopic(connection, MY_TOPIC_SUBSCRIPTION_NAME);
-
- TopicSubscriber noLocalSubscriber =
- session.createDurableSubscriber(topic, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal", null, true);
-
-
- sendMessage(session, topic, SEND_COUNT);
-
- // Check messages can be received as expected.
- connection.start();
-
- List<Message> received = receiveMessage(noLocalSubscriber, SEND_COUNT);
- assertEquals("No Local Subscriber Received messages", 0, received.size());
-
-
-
- session.commit();
-
- Connection connection3 = getConnection();
- Session session3 = connection3.createSession(true, Session.SESSION_TRANSACTED);
- sendMessage(session3, topic, SEND_COUNT);
-
-
- connection.close();
-
- //We didn't receive the messages on the durable queue for the no-local subscriber
- //so they are still on the broker. Restart the broker, prompting their recovery.
- restartDefaultBroker();
-
- Connection connection2 = getConnectionBuilder().setClientId("testClientId").build();
- connection2.start();
-
- Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
- Topic topic2 = createTopic(connection2, MY_TOPIC_SUBSCRIPTION_NAME);
-
- TopicSubscriber noLocalSubscriber2 =
- session2.createDurableSubscriber(topic2, MY_TOPIC_SUBSCRIPTION_NAME + "-NoLocal",null, true);
-
- // The NO-local subscriber should receive messages sent from connection3
- received = receiveMessage(noLocalSubscriber2, SEND_COUNT);
- session2.commit();
- assertEquals("No Local Subscriber did not receive expected messages", SEND_COUNT, received.size());
-
- noLocalSubscriber2.close();
-
-
- }
-
- protected List<Message> receiveMessage(MessageConsumer messageConsumer,
- int count) throws JMSException
- {
-
- List<Message> receivedMessages = new ArrayList<Message>(count);
- for (int i = 0; i < count; i++)
- {
- Message received = messageConsumer.receive(1000);
-
- if (received != null)
- {
- receivedMessages.add(received);
- }
- else
- {
- break;
- }
- }
-
- return receivedMessages;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e3f8e59a/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index 6b10a13..966aebd 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -63,9 +63,6 @@ org.apache.qpid.test.client.QueueBrowsingFlowToDiskTest#*
// This test currently does not pick up the runtime location of the nonVm queueBacking store.
org.apache.qpid.test.unit.close.FlowToDiskBackingQueueDeleteTest#*
-//QPID-1818 : 0-10 Client code path does not correctly restore a transacted session after failover.
-org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
-
// QPID-1730: the C++ server has a totally different logging mechanism. We should split this file differently
org.apache.qpid.server.AlertingTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e3f8e59a/test-profiles/JavaTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaTransientExcludes b/test-profiles/JavaTransientExcludes
index 632f63d..964aefd 100644
--- a/test-profiles/JavaTransientExcludes
+++ b/test-profiles/JavaTransientExcludes
@@ -18,7 +18,6 @@
//
//These tests require a persistent store
-org.apache.qpid.server.persistent.NoLocalAfterRecoveryTest#*
org.apache.qpid.server.store.PersistentStoreTest#*
org.apache.qpid.server.store.SplitStoreTest#*
org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithRestart
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org