You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/02/20 12:43:07 UTC

[1/3] qpid-broker-j git commit: QPID-8098: [Broker-J] [AMQP 0-10] No longer unconditionally increment deliveryCount when MessageAcquireMode.NOT_ACQUIRED

Repository: qpid-broker-j
Updated Branches:
  refs/heads/7.0.x 8e874ce58 -> a65119984


QPID-8098: [Broker-J] [AMQP 0-10] No longer unconditionally increment deliveryCount when MessageAcquireMode.NOT_ACQUIRED

(cherry picked from commit a6408e156a717e8e91aa95e3220ed300c71c52bd)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/eff7b5dd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/eff7b5dd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/eff7b5dd

Branch: refs/heads/7.0.x
Commit: eff7b5dd1a70b653103329c7c2de8422435525b0
Parents: 8e874ce
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:50:42 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Feb 20 11:45:24 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/ConsumerTarget_0_10.java     | 128 ++++++++++++++++---
 ...ExplicitAcceptDispositionChangeListener.java |  73 -----------
 ...ImplicitAcceptDispositionChangeListener.java |  80 ------------
 .../server/protocol/v0_10/ServerSession.java    |   6 +-
 4 files changed, 110 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/eff7b5dd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 21156de..8d42d2b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -305,6 +305,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
 
 
         _postIdSettingAction.setXfr(xfr);
+        _postIdSettingAction.setAction(null);
+
+        if (_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+        {
+            entry.incrementDeliveryCount();
+        }
+
         if(_acceptMode == MessageAcceptMode.EXPLICIT)
         {
             _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
@@ -313,11 +320,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         {
             _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
         }
-        else
-        {
-            _postIdSettingAction.setAction(null);
-        }
-
 
         _session.sendMessage(xfr, _postIdSettingAction);
         xfr.dispose();
@@ -328,7 +330,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         _postIdSettingAction.setAction(null);
         _postIdSettingAction.setXfr(null);
 
-        entry.incrementDeliveryCount();
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
         {
             forceDequeue(entry, false);
@@ -403,27 +404,14 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
 
     void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
-        entry.setRedelivered();
         if (entry.makeAcquisitionUnstealable(consumer))
         {
             entry.routeToAlternate(null, null);
         }
     }
 
-    void release(final MessageInstanceConsumer consumer,
-                 final MessageInstance entry,
-                 final boolean setRedelivered)
+    void release(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
-        if (setRedelivered)
-        {
-            entry.setRedelivered();
-        }
-
-        if (getSession().isClosing() || !setRedelivered)
-        {
-            entry.decrementDeliveryCount();
-        }
-
         if (isMaxDeliveryLimitReached(entry))
         {
             sendToDLQOrDiscard(consumer, entry);
@@ -434,7 +422,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
-    protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
+    private void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
     {
         final ServerMessage msg = entry.getMessage();
 
@@ -625,4 +613,102 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
+    static abstract class AbstractDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+    {
+        final MessageInstance _entry;
+        final ConsumerTarget_0_10 _target;
+        final MessageInstanceConsumer _consumer;
+
+        AbstractDispositionChangeListener(final MessageInstance entry,
+                                          final ConsumerTarget_0_10 target,
+                                          final MessageInstanceConsumer consumer)
+        {
+            _entry = entry;
+            _target = target;
+            _consumer = consumer;
+        }
+
+        @Override
+        public final void onRelease(boolean setRedelivered, final boolean closing)
+        {
+            _target.release(_consumer, _entry);
+
+            if (setRedelivered)
+            {
+                _entry.setRedelivered();
+            }
+
+            if (closing || !setRedelivered)
+            {
+                _entry.decrementDeliveryCount();
+            }
+        }
+
+        @Override
+        public final void onReject()
+        {
+            _entry.setRedelivered();
+            _target.reject(_consumer, _entry);
+        }
+    }
+
+    static class ImplicitAcceptDispositionChangeListener extends AbstractDispositionChangeListener
+    {
+
+        private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+        ImplicitAcceptDispositionChangeListener(final MessageInstance entry,
+                                                final ConsumerTarget_0_10 target,
+                                                final MessageInstanceConsumer consumer)
+        {
+            super(entry, target, consumer);
+        }
+
+        @Override
+        public void onAccept()
+        {
+            LOGGER.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
+        }
+
+        @Override
+        public boolean acquire()
+        {
+            boolean acquired = _entry.acquire(_consumer);
+            if(acquired)
+            {
+                _entry.incrementDeliveryCount();
+                _target.addUnacknowledgedMessage(_entry);
+            }
+            return acquired;
+        }
+    }
+
+    static class ExplicitAcceptDispositionChangeListener extends AbstractDispositionChangeListener
+    {
+
+        ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+                                                ConsumerTarget_0_10 target,
+                                                final MessageInstanceConsumer consumer)
+        {
+            super(entry, target, consumer);
+        }
+
+        @Override
+        public void onAccept()
+        {
+            _target.acknowledge(_consumer, _entry);
+        }
+        @Override
+        public boolean acquire()
+        {
+            final boolean acquired = _entry.acquire(_consumer);
+            if (acquired)
+            {
+                _entry.incrementDeliveryCount();
+            }
+            return acquired;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/eff7b5dd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index 427355b..0000000
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-
-class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(ExplicitAcceptDispositionChangeListener.class);
-
-
-    private final MessageInstance _entry;
-    private final ConsumerTarget_0_10 _target;
-    private final MessageInstanceConsumer _consumer;
-
-    public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
-                                                   ConsumerTarget_0_10 target,
-                                                   final MessageInstanceConsumer consumer)
-    {
-        _entry = entry;
-        _target = target;
-        _consumer = consumer;
-    }
-
-    @Override
-    public void onAccept()
-    {
-        _target.acknowledge(_consumer, _entry);
-    }
-
-    @Override
-    public void onRelease(boolean setRedelivered)
-    {
-        _target.release(_consumer, _entry, setRedelivered);
-    }
-
-    @Override
-    public void onReject()
-    {
-        _target.reject(_consumer, _entry);
-    }
-
-    @Override
-    public boolean acquire()
-    {
-        return _entry.acquire(_consumer);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/eff7b5dd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index bcfa205..0000000
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
-
-
-    private final MessageInstance _entry;
-    private final ConsumerTarget_0_10 _target;
-    private final MessageInstanceConsumer _consumer;
-
-    public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
-                                                   ConsumerTarget_0_10 target,
-                                                   final MessageInstanceConsumer consumer)
-    {
-        _entry = entry;
-        _target = target;
-        _consumer = consumer;
-    }
-
-    @Override
-    public void onAccept()
-    {
-        LOGGER.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
-    }
-
-    @Override
-    public void onRelease(boolean setRedelivered)
-    {
-        _target.release(_consumer, _entry, setRedelivered);
-
-    }
-
-    @Override
-    public void onReject()
-    {
-        _target.reject(_consumer, _entry);
-    }
-
-    @Override
-    public boolean acquire()
-    {
-        boolean acquired = _entry.acquire(_consumer);
-        if(acquired)
-        {
-            _target.addUnacknowledgedMessage(_entry);
-        }
-        return acquired;
-
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/eff7b5dd/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 32a2474..22d539b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -860,7 +860,7 @@ public class ServerSession extends SessionInvoker
     {
         void onAccept();
 
-        void onRelease(boolean setRedelivered);
+        void onRelease(boolean setRedelivered, final boolean closing);
 
         void onReject();
 
@@ -1012,7 +1012,7 @@ public class ServerSession extends SessionInvoker
                                           @Override
                                           public void performAction(MessageDispositionChangeListener listener)
                                           {
-                                              listener.onRelease(setRedelivered);
+                                              listener.onRelease(setRedelivered, false);
                                           }
                                       });
     }
@@ -1158,7 +1158,7 @@ public class ServerSession extends SessionInvoker
 
         for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
         {
-            listener.onRelease(true);
+            listener.onRelease(true, true);
         }
         _messageDispositionListenerMap.clear();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-8099: [Broker-J] Make MessageInfo and LogRecord implement ManagedAttributeValue.

Posted by or...@apache.org.
QPID-8099: [Broker-J] Make MessageInfo and LogRecord implement ManagedAttributeValue.

(cherry picked from commit 1a28080d0872c2ff9c52f3ef915d83e33fbf4de0)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/cc65384e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/cc65384e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/cc65384e

Branch: refs/heads/7.0.x
Commit: cc65384e0cf0ce88e2ab1ee741cbb1afd8675778
Parents: eff7b5d
Author: Keith Wall <kw...@apache.org>
Authored: Thu Feb 15 09:25:50 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Feb 20 12:42:13 2018 +0000

----------------------------------------------------------------------
 .../main/java/org/apache/qpid/server/message/MessageInfo.java    | 3 ++-
 .../java/org/apache/qpid/server/logging/logback/LogRecord.java   | 4 +++-
 2 files changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cc65384e/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
index 0ad3df0..055ec23 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageInfo.java
@@ -23,10 +23,11 @@ package org.apache.qpid.server.message;
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.qpid.server.model.ManagedAttributeValue;
 import org.apache.qpid.server.model.ManagedAttributeValueType;
 
 @ManagedAttributeValueType
-public interface MessageInfo
+public interface MessageInfo extends ManagedAttributeValue
 {
     long getId();
     long getSize();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/cc65384e/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/LogRecord.java
----------------------------------------------------------------------
diff --git a/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/LogRecord.java b/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/LogRecord.java
index 0010a0a..8356c20 100644
--- a/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/LogRecord.java
+++ b/broker-plugins/logging-logback/src/main/java/org/apache/qpid/server/logging/logback/LogRecord.java
@@ -21,10 +21,12 @@
 package org.apache.qpid.server.logging.logback;
 
 import ch.qos.logback.classic.spi.ILoggingEvent;
+
+import org.apache.qpid.server.model.ManagedAttributeValue;
 import org.apache.qpid.server.model.ManagedAttributeValueType;
 
 @ManagedAttributeValueType
-public class LogRecord
+public class LogRecord implements ManagedAttributeValue
 {
     private final ILoggingEvent _event;
     private final long _id;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-8098: [Broker-J] Add supporting test case relating to browsing and delivery counts

Posted by or...@apache.org.
QPID-8098: [Broker-J] Add supporting test case relating to browsing and delivery counts

(cherry picked from commit 59bd08a3103495ae0e8602c3e23b222a02a97bdf. Merge conflicts resolved manually)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a6511998
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a6511998
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a6511998

Branch: refs/heads/7.0.x
Commit: a6511998486d9a828c1cd37384b3fd82186c0c35
Parents: cc65384
Author: Keith Wall <kw...@apache.org>
Authored: Thu Feb 15 10:24:22 2018 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Feb 20 12:42:13 2018 +0000

----------------------------------------------------------------------
 .../qpid/test/utils/AmqpManagementFacade.java   | 33 ++++++++-
 .../test/unit/client/MaxDeliveryCountTest.java  | 74 ++++++++++++++++++++
 2 files changed, 105 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6511998/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
index 041604a..687f1c7 100644
--- a/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
+++ b/systests/src/main/java/org/apache/qpid/test/utils/AmqpManagementFacade.java
@@ -33,9 +33,11 @@ import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageEOFException;
 import javax.jms.MessageProducer;
 import javax.jms.ObjectMessage;
 import javax.jms.Session;
+import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -159,7 +161,7 @@ public class AmqpManagementFacade
             else
             {
                 ObjectMapper objectMapper = new ObjectMapper();
-                String jsonifiedValue = null;
+                String jsonifiedValue;
                 try
                 {
                     jsonifiedValue = objectMapper.writeValueAsString(value);
@@ -182,7 +184,34 @@ public class AmqpManagementFacade
         Message response = consumer.receive(5000);
         try
         {
-            if (response instanceof MapMessage)
+            int statusCode = response.getIntProperty("statusCode");
+            if (statusCode < 200 || statusCode > 299)
+            {
+                throw new RuntimeException(String.format("Unexpected operation status %d : %s",
+                                                         statusCode,
+                                                         response.getStringProperty("statusDescription")));
+            }
+            if (response instanceof StreamMessage)
+            {
+                StreamMessage bodyStream = (StreamMessage) response;
+                List<Object> result = new ArrayList<>();
+                boolean done = false;
+                do
+                {
+                    try
+                    {
+                        result.add(bodyStream.readObject());
+                    }
+                    catch (MessageEOFException mfe)
+                    {
+                        // Expected - end of stream
+                        done = true;
+                    }
+                }
+                while (!done);
+                return result;
+            }
+            else if (response instanceof MapMessage)
             {
                 MapMessage bodyMap = (MapMessage) response;
                 Map<String, Object> result = new TreeMap<>();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6511998/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
index 285224a..428a1d9 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
@@ -36,6 +36,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
+import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
@@ -69,6 +70,7 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
     private String _failMsg;
     private static final int MSG_COUNT = 15;
     private static final int MAX_DELIVERY_COUNT = 2;
+    private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
     private CountDownLatch _awaitCompletion;
 
     private long _awaitEmptyQueue;
@@ -172,6 +174,78 @@ public class MaxDeliveryCountTest extends QpidBrokerTestCase
         doTest(Session.SESSION_TRANSACTED, _redeliverMsgs, true);
     }
 
+    public void testBrowsingDoesNotIncrementDeliveryCount() throws Exception
+    {
+        Connection connection = getConnection();
+        try
+        {
+            connection.start();
+            final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            final Map<String, Object> messageInfoBefore = getMessageInfo(_testQueueName, 0);
+            assertEquals("Unexpected delivery count before browse", 0, messageInfoBefore.get("deliveryCount"));
+
+            browseQueueAndValidationDeliveryHeaders(session, _testQueue);
+
+            final Map<String, Object> messageInfoAfter = getMessageInfo(_testQueueName, 0);
+            assertEquals("Unexpected delivery count after first browse", 0, messageInfoAfter.get("deliveryCount"));
+
+            browseQueueAndValidationDeliveryHeaders(session, _testQueue);
+
+            final Map<String, Object> messageInfoAfterSecondBrowse = getMessageInfo(_testQueueName, 0);
+            assertEquals("Unexpected delivery count after second browse",
+                         0,
+                         messageInfoAfterSecondBrowse.get("deliveryCount"));
+
+            browseQueueAndValidationDeliveryHeaders(session, _testQueue);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    private Map<String, Object> getMessageInfo(String queueName, final int index) throws Exception
+    {
+        List<Map<String, Object>> messages;
+        Connection connection = getConnection();
+        try
+        {
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            messages = (List<Map<String, Object>>) performOperationUsingAmqpManagement(queueName,
+                                                                                       "getMessageInfo",
+                                                                                       session,
+                                                                                       "org.apache.qpid.Queue",
+                                                                                       Collections.emptyMap());
+        }
+        finally
+        {
+            connection.close();
+        }
+        assertTrue("Too few messsages on the queue: " + messages.size(), messages.size()>index);
+        return messages.get(index);
+    }
+
+    private void browseQueueAndValidationDeliveryHeaders(final Session session, final Queue queue) throws Exception
+    {
+        final QueueBrowser browser = session.createBrowser(queue);
+        @SuppressWarnings("unchecked")
+        final List<Message> messages = (List<Message>) new ArrayList(Collections.list(browser.getEnumeration()));
+        assertEquals("Unexpected number of messages seen by browser", MSG_COUNT, messages.size());
+        for (Message browsedMessage: messages)
+        {
+            assertFalse(browsedMessage.getJMSRedelivered());
+
+            if (browsedMessage.propertyExists(JMSX_DELIVERY_COUNT))
+            {
+                assertEquals(1, browsedMessage.getIntProperty(JMSX_DELIVERY_COUNT));
+            }
+        }
+        browser.close();
+    }
+
     private void doTest(final int deliveryMode,
                         final List<Integer> redeliverMsgs,
                         final boolean synchronous) throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org