You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/14 17:06:03 UTC

[1/3] qpid-broker-j git commit: QPID-8083 [System Tests] [REST/HTTP] Remove keystores/trusts stores left behind by the test

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 7601044d1 -> a827841b2


QPID-8083 [System Tests] [REST/HTTP] Remove keystores/trusts stores left behind by the test


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7851928d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7851928d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7851928d

Branch: refs/heads/master
Commit: 7851928d030754066034bd225084addcbf332d66
Parents: 7601044
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 10:38:44 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 10:38:44 2018 +0000

----------------------------------------------------------------------
 .../java/org/apache/qpid/tests/http/rest/model/ReadTest.java   | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7851928d/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
index 43ee305..d33f0b1 100644
--- a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
+++ b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
@@ -267,6 +267,8 @@ public class ReadTest extends HttpTestBase
                 final Map<String, Object> store = getHelper().getJsonAsMap(storeUrl);
                 assertThat(store.get(NonJavaKeyStore.PRIVATE_KEY_URL),
                            is(equalTo(AbstractConfiguredObject.SECURED_STRING_VALUE)));
+
+                getHelper().submitRequest(storeUrl, "DELETE", SC_OK);
             }
 
             {
@@ -279,6 +281,8 @@ public class ReadTest extends HttpTestBase
 
                 final Map<String, Object> store = getHelper().getJsonAsMap(storeUrl);
                 assertThat(store.get(NonJavaKeyStore.PRIVATE_KEY_URL), is(equalTo(privateKeyFileUrl)));
+
+                getHelper().submitRequest(storeUrl, "DELETE", SC_OK);
             }
         }
         finally
@@ -307,6 +311,8 @@ public class ReadTest extends HttpTestBase
 
         final Map<String, Object> full = getHelper().getJsonAsMap(storeUrl +  String.format("?oversize=%d", dataUrl.length()));
         assertThat(full.get(NonJavaTrustStore.CERTIFICATES_URL), is(equalTo(dataUrl)));
+
+        getHelper().submitRequest(storeUrl, "DELETE", SC_OK);
     }
 
     private String createLoggerAndRule(final String loggerName, final String inclusionRuleName) throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-8098: [Broker-J] [AMQP 0-10] No longer unconditionally increment deliveryCount when MessageAcquireMode.NOT_ACQUIRED

Posted by kw...@apache.org.
QPID-8098: [Broker-J] [AMQP 0-10] No longer unconditionally increment deliveryCount when MessageAcquireMode.NOT_ACQUIRED


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a6408e15
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a6408e15
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a6408e15

Branch: refs/heads/master
Commit: a6408e156a717e8e91aa95e3220ed300c71c52bd
Parents: 7851928
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:50:42 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 15:50:42 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/ConsumerTarget_0_10.java     | 128 ++++++++++++++++---
 ...ExplicitAcceptDispositionChangeListener.java |  73 -----------
 ...ImplicitAcceptDispositionChangeListener.java |  80 ------------
 .../server/protocol/v0_10/ServerSession.java    |   6 +-
 4 files changed, 110 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 21156de..8d42d2b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -305,6 +305,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
 
 
         _postIdSettingAction.setXfr(xfr);
+        _postIdSettingAction.setAction(null);
+
+        if (_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+        {
+            entry.incrementDeliveryCount();
+        }
+
         if(_acceptMode == MessageAcceptMode.EXPLICIT)
         {
             _postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
@@ -313,11 +320,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         {
             _postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
         }
-        else
-        {
-            _postIdSettingAction.setAction(null);
-        }
-
 
         _session.sendMessage(xfr, _postIdSettingAction);
         xfr.dispose();
@@ -328,7 +330,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         _postIdSettingAction.setAction(null);
         _postIdSettingAction.setXfr(null);
 
-        entry.incrementDeliveryCount();
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
         {
             forceDequeue(entry, false);
@@ -403,27 +404,14 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
 
     void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
-        entry.setRedelivered();
         if (entry.makeAcquisitionUnstealable(consumer))
         {
             entry.routeToAlternate(null, null);
         }
     }
 
-    void release(final MessageInstanceConsumer consumer,
-                 final MessageInstance entry,
-                 final boolean setRedelivered)
+    void release(final MessageInstanceConsumer consumer, final MessageInstance entry)
     {
-        if (setRedelivered)
-        {
-            entry.setRedelivered();
-        }
-
-        if (getSession().isClosing() || !setRedelivered)
-        {
-            entry.decrementDeliveryCount();
-        }
-
         if (isMaxDeliveryLimitReached(entry))
         {
             sendToDLQOrDiscard(consumer, entry);
@@ -434,7 +422,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
-    protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
+    private void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
     {
         final ServerMessage msg = entry.getMessage();
 
@@ -625,4 +613,102 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
         }
     }
 
+    static abstract class AbstractDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+    {
+        final MessageInstance _entry;
+        final ConsumerTarget_0_10 _target;
+        final MessageInstanceConsumer _consumer;
+
+        AbstractDispositionChangeListener(final MessageInstance entry,
+                                          final ConsumerTarget_0_10 target,
+                                          final MessageInstanceConsumer consumer)
+        {
+            _entry = entry;
+            _target = target;
+            _consumer = consumer;
+        }
+
+        @Override
+        public final void onRelease(boolean setRedelivered, final boolean closing)
+        {
+            _target.release(_consumer, _entry);
+
+            if (setRedelivered)
+            {
+                _entry.setRedelivered();
+            }
+
+            if (closing || !setRedelivered)
+            {
+                _entry.decrementDeliveryCount();
+            }
+        }
+
+        @Override
+        public final void onReject()
+        {
+            _entry.setRedelivered();
+            _target.reject(_consumer, _entry);
+        }
+    }
+
+    static class ImplicitAcceptDispositionChangeListener extends AbstractDispositionChangeListener
+    {
+
+        private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+        ImplicitAcceptDispositionChangeListener(final MessageInstance entry,
+                                                final ConsumerTarget_0_10 target,
+                                                final MessageInstanceConsumer consumer)
+        {
+            super(entry, target, consumer);
+        }
+
+        @Override
+        public void onAccept()
+        {
+            LOGGER.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
+        }
+
+        @Override
+        public boolean acquire()
+        {
+            boolean acquired = _entry.acquire(_consumer);
+            if(acquired)
+            {
+                _entry.incrementDeliveryCount();
+                _target.addUnacknowledgedMessage(_entry);
+            }
+            return acquired;
+        }
+    }
+
+    static class ExplicitAcceptDispositionChangeListener extends AbstractDispositionChangeListener
+    {
+
+        ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+                                                ConsumerTarget_0_10 target,
+                                                final MessageInstanceConsumer consumer)
+        {
+            super(entry, target, consumer);
+        }
+
+        @Override
+        public void onAccept()
+        {
+            _target.acknowledge(_consumer, _entry);
+        }
+        @Override
+        public boolean acquire()
+        {
+            final boolean acquired = _entry.acquire(_consumer);
+            if (acquired)
+            {
+                _entry.incrementDeliveryCount();
+            }
+            return acquired;
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index 427355b..0000000
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-
-class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(ExplicitAcceptDispositionChangeListener.class);
-
-
-    private final MessageInstance _entry;
-    private final ConsumerTarget_0_10 _target;
-    private final MessageInstanceConsumer _consumer;
-
-    public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
-                                                   ConsumerTarget_0_10 target,
-                                                   final MessageInstanceConsumer consumer)
-    {
-        _entry = entry;
-        _target = target;
-        _consumer = consumer;
-    }
-
-    @Override
-    public void onAccept()
-    {
-        _target.acknowledge(_consumer, _entry);
-    }
-
-    @Override
-    public void onRelease(boolean setRedelivered)
-    {
-        _target.release(_consumer, _entry, setRedelivered);
-    }
-
-    @Override
-    public void onReject()
-    {
-        _target.reject(_consumer, _entry);
-    }
-
-    @Override
-    public boolean acquire()
-    {
-        return _entry.acquire(_consumer);
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index bcfa205..0000000
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
-
-
-    private final MessageInstance _entry;
-    private final ConsumerTarget_0_10 _target;
-    private final MessageInstanceConsumer _consumer;
-
-    public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
-                                                   ConsumerTarget_0_10 target,
-                                                   final MessageInstanceConsumer consumer)
-    {
-        _entry = entry;
-        _target = target;
-        _consumer = consumer;
-    }
-
-    @Override
-    public void onAccept()
-    {
-        LOGGER.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
-    }
-
-    @Override
-    public void onRelease(boolean setRedelivered)
-    {
-        _target.release(_consumer, _entry, setRedelivered);
-
-    }
-
-    @Override
-    public void onReject()
-    {
-        _target.reject(_consumer, _entry);
-    }
-
-    @Override
-    public boolean acquire()
-    {
-        boolean acquired = _entry.acquire(_consumer);
-        if(acquired)
-        {
-            _target.addUnacknowledgedMessage(_entry);
-        }
-        return acquired;
-
-    }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 32a2474..22d539b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -860,7 +860,7 @@ public class ServerSession extends SessionInvoker
     {
         void onAccept();
 
-        void onRelease(boolean setRedelivered);
+        void onRelease(boolean setRedelivered, final boolean closing);
 
         void onReject();
 
@@ -1012,7 +1012,7 @@ public class ServerSession extends SessionInvoker
                                           @Override
                                           public void performAction(MessageDispositionChangeListener listener)
                                           {
-                                              listener.onRelease(setRedelivered);
+                                              listener.onRelease(setRedelivered, false);
                                           }
                                       });
     }
@@ -1158,7 +1158,7 @@ public class ServerSession extends SessionInvoker
 
         for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
         {
-            listener.onRelease(true);
+            listener.onRelease(true, true);
         }
         _messageDispositionListenerMap.clear();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-10] Add a protocol test supporting explicit message acquision

Posted by kw...@apache.org.
QPID-8038: [Broker-J] [AMQP 0-10] Add a protocol test supporting explicit message acquision


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a827841b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a827841b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a827841b

Branch: refs/heads/master
Commit: a827841b24b22cd21f0ca6523da00093f2a26669
Parents: a6408e1
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:53:20 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 15:53:20 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/MessageInteraction.java      | 20 ++++++
 .../qpid/tests/protocol/v0_10/MessageTest.java  | 71 ++++++++++++++++++++
 2 files changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a827841b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
index 54ecf50..43bd2ec 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
 import org.apache.qpid.server.protocol.v0_10.transport.Header;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquire;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCancel;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
@@ -42,6 +43,7 @@ public class MessageInteraction
     private MessageCancel _cancel;
     private MessageFlow _flow;
     private MessageAccept _accept;
+    private MessageAcquire _acquire;
 
     public MessageInteraction(final Interaction interaction)
     {
@@ -51,6 +53,7 @@ public class MessageInteraction
         _cancel = new MessageCancel();
         _flow = new MessageFlow();
         _accept = new MessageAccept();
+        _acquire = new MessageAcquire();
     }
 
     public MessageInteraction transferId(final int id)
@@ -199,4 +202,21 @@ public class MessageInteraction
         _accept.setTransfers(transfers);
         return this;
     }
+
+    public Interaction acquire() throws Exception
+    {
+        return _interaction.sendPerformative(_acquire);
+    }
+
+    public MessageInteraction acquireId(final int id)
+    {
+        _acquire.setId(id);
+        return this;
+    }
+
+    public MessageInteraction acquireTransfers(final RangeSet transfers)
+    {
+        _acquire.setTransfers(transfers);
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a827841b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 4088b9b..6747451 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -35,6 +35,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.Acquired;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
 import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
@@ -235,6 +237,75 @@ public class MessageTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @SpecificationTest(section = "10.message.acquire",
+            description = "Acquires previously transferred messages for consumption. The acquired ids (if any) are "
+                          + "sent via message.acquired.")
+    public void acquireTransfer() throws Exception
+    {
+        String testMessageBody = "testMessage";
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            byte[] sessionName = "testSession".getBytes(UTF_8);
+            final String subscriberName = "testSubscriber";
+            interaction.openAnonymousConnection()
+                       .channelId(1)
+                       .attachSession(sessionName)
+                       .message()
+                       .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+                       .subscribeAcquireMode(MessageAcquireMode.NOT_ACQUIRED)
+                       .subscribeDestination(subscriberName)
+                       .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+                       .subscribeId(0)
+                       .subscribe()
+                       .message()
+                       .flowId(1)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.MESSAGE)
+                       .flowValue(1)
+                       .flow()
+                       .message()
+                       .flowId(2)
+                       .flowDestination(subscriberName)
+                       .flowUnit(MessageCreditUnit.BYTE)
+                       .flowValue(-1)
+                       .flow();
+
+            MessageTransfer transfer = consumeResponse(interaction,
+                                                       MessageTransfer.class,
+                                                       SessionCompleted.class,
+                                                       SessionCommandPoint.class,
+                                                       SessionConfirmed.class,
+                                                       SessionFlush.class);
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+            RangeSet transfers = Range.newInstance(transfer.getId());
+            final ExecutionResult result = interaction.message().acquireId(3).acquireTransfers(transfers).acquire()
+                                                      .consumeResponse(SessionFlush.class)
+                                                      .consumeResponse().getLatestResponse(ExecutionResult.class);
+            final Acquired acquired = (Acquired) result.getValue();
+            assertThat(acquired.getTransfers().includes(transfer.getId()), is(equalTo(true)));
+
+            interaction.message().acceptId(4).acceptTransfers(transfers).accept()
+                       .session().flushCompleted()
+                                 .flush();
+
+            SessionCompleted completed = consumeResponse(interaction,
+                                                         SessionCompleted.class,
+                                                         SessionCommandPoint.class,
+                                                         SessionConfirmed.class,
+                                                         SessionFlush.class);
+
+            assertThat(completed.getCommands(), is(notNullValue()));
+            assertThat(completed.getCommands().includes(4), is(equalTo(true)));
+
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
     private <T extends Method> T consumeResponse(final Interaction interaction,
                                                  final Class<T> expected,
                                                  final Class<? extends Method>... ignore)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org