You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/02/20 12:43:09 UTC
[3/3] qpid-broker-j git commit: QPID-8098: [Broker-J] Add supporting
test case relating to browsing and delivery counts
QPID-8098: [Broker-J] Add supporting test case relating to browsing and delivery counts
(cherry picked from commit 59bd08a3103495ae0e8602c3e23b222a02a97bdf. Merge conflicts resolved manually)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a6511998
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a6511998
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a6511998
Branch: refs/heads/7.0.x
Commit: a6511998486d9a828c1cd37384b3fd82186c0c35
Parents: cc65384
Author: Keith Wall <kw...@apache.org>
Authored: Thu Feb 15 10:24:22 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Feb 20 12:42:13 2018 +0000
----------------------------------------------------------------------
.../qpid/test/utils/AmqpManagementFacade.java | 33 ++++++++-
.../test/unit/client/MaxDeliveryCountTest.java | 74 ++++++++++++++++++++
2 files changed, 105 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6511998/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
index 041604a..687f1c7 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
@@ -33,9 +33,11 @@ import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
+import javax.jms.StreamMessage;
import javax.jms.TemporaryQueue;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -159,7 +161,7 @@ public class AmqpManagementFacade
else
{
ObjectMapper objectMapper = new ObjectMapper();
- String jsonifiedValue = null;
+ String jsonifiedValue;
try
{
jsonifiedValue = objectMapper.writeValueAsString(value);
@@ -182,7 +184,34 @@ public class AmqpManagementFacade
Message response = consumer.receive(5000);
try
{
- if (response instanceof MapMessage)
+ int statusCode = response.getIntProperty("statusCode");
+ if (statusCode < 200 || statusCode > 299)
+ {
+ throw new RuntimeException(String.format("Unexpected operation status %d : %s",
+ statusCode,
+ response.getStringProperty("statusDescription")));
+ }
+ if (response instanceof StreamMessage)
+ {
+ StreamMessage bodyStream = (StreamMessage) response;
+ List<Object> result = new ArrayList<>();
+ boolean done = false;
+ do
+ {
+ try
+ {
+ result.add(bodyStream.readObject());
+ }
+ catch (MessageEOFException mfe)
+ {
+ // Expected - end of stream
+ done = true;
+ }
+ }
+ while (!done);
+ return result;
+ }
+ else if (response instanceof MapMessage)
{
MapMessage bodyMap = (MapMessage) response;
Map<String, Object> result = new TreeMap<>();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6511998/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 285224a..428a1d9 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -36,6 +36,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
+import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
@@ -69,6 +70,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
private String _failMsg;
private static final int MSG_COUNT = 15;
private static final int MAX_DELIVERY_COUNT = 2;
+ private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
private CountDownLatch _awaitCompletion;
private long _awaitEmptyQueue;
@@ -172,6 +174,78 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true);
}
+ public void testBrowsingDoesNotIncrementDeliveryCount() throws Exception
+ {
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ final Map<String, Object> messageInfoBefore = getMessageInfo(_testQueueName, 0);
+ assertEquals("Unexpected delivery count before browse", 0, messageInfoBefore.get("deliveryCount"));
+
+ browseQueueAndValidationDeliveryHeaders(session, _testQueue);
+
+ final Map<String, Object> messageInfoAfter = getMessageInfo(_testQueueName, 0);
+ assertEquals("Unexpected delivery count after first browse", 0, messageInfoAfter.get("deliveryCount"));
+
+ browseQueueAndValidationDeliveryHeaders(session, _testQueue);
+
+ final Map<String, Object> messageInfoAfterSecondBrowse = getMessageInfo(_testQueueName, 0);
+ assertEquals("Unexpected delivery count after second browse",
+ 0,
+ messageInfoAfterSecondBrowse.get("deliveryCount"));
+
+ browseQueueAndValidationDeliveryHeaders(session, _testQueue);
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ private Map<String, Object> getMessageInfo(String queueName, final int index) throws Exception
+ {
+ List<Map<String, Object>> messages;
+ Connection connection = getConnection();
+ try
+ {
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ messages = (List<Map<String, Object>>) performOperationUsingAmqpManagement(queueName,
+ "getMessageInfo",
+ session,
+ "org.apache.qpid.Queue",
+ Collections.emptyMap());
+ }
+ finally
+ {
+ connection.close();
+ }
+ assertTrue("Too few messsages on the queue: " + messages.size(), messages.size()>index);
+ return messages.get(index);
+ }
+
+ private void browseQueueAndValidationDeliveryHeaders(final Session session, final Queue queue) throws Exception
+ {
+ final QueueBrowser browser = session.createBrowser(queue);
+ @SuppressWarnings("unchecked")
+ final List<Message> messages = (List<Message>) new ArrayList(Collections.list(browser.getEnumeration()));
+ assertEquals("Unexpected number of messages seen by browser", MSG_COUNT, messages.size());
+ for (Message browsedMessage: messages)
+ {
+ assertFalse(browsedMessage.getJMSRedelivered());
+
+ if (browsedMessage.propertyExists(JMSX_DELIVERY_COUNT))
+ {
+ assertEquals(1, browsedMessage.getIntProperty(JMSX_DELIVERY_COUNT));
+ }
+ }
+ browser.close();
+ }
+
private void doTest(final int deliveryMode,
final List<Integer> redeliverMsgs,
final boolean synchronous) throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org