You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/14 17:06:03 UTC
[1/3] qpid-broker-j git commit: QPID-8083 [System Tests] [REST/HTTP]
Remove keystores/trusts stores left behind by the test
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 7601044d1 -> a827841b2
QPID-8083 [System Tests] [REST/HTTP] Remove keystores/trusts stores left behind by the test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7851928d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7851928d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7851928d
Branch: refs/heads/master
Commit: 7851928d030754066034bd225084addcbf332d66
Parents: 7601044
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 10:38:44 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 10:38:44 2018 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/tests/http/rest/model/ReadTest.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7851928d/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
index 43ee305..d33f0b1 100644
--- a/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
+++ b/systests/qpid-systests-http-management/src/test/java/org/apache/qpid/tests/http/rest/model/ReadTest.java
@@ -267,6 +267,8 @@ public class ReadTest extends HttpTestBase
final Map<String, Object> store = getHelper().getJsonAsMap(storeUrl);
assertThat(store.get(NonJavaKeyStore.PRIVATE_KEY_URL),
is(equalTo(AbstractConfiguredObject.SECURED_STRING_VALUE)));
+
+ getHelper().submitRequest(storeUrl, "DELETE", SC_OK);
}
{
@@ -279,6 +281,8 @@ public class ReadTest extends HttpTestBase
final Map<String, Object> store = getHelper().getJsonAsMap(storeUrl);
assertThat(store.get(NonJavaKeyStore.PRIVATE_KEY_URL), is(equalTo(privateKeyFileUrl)));
+
+ getHelper().submitRequest(storeUrl, "DELETE", SC_OK);
}
}
finally
@@ -307,6 +311,8 @@ public class ReadTest extends HttpTestBase
final Map<String, Object> full = getHelper().getJsonAsMap(storeUrl + String.format("?oversize=%d", dataUrl.length()));
assertThat(full.get(NonJavaTrustStore.CERTIFICATES_URL), is(equalTo(dataUrl)));
+
+ getHelper().submitRequest(storeUrl, "DELETE", SC_OK);
}
private String createLoggerAndRule(final String loggerName, final String inclusionRuleName) throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-8098: [Broker-J] [AMQP 0-10] No
longer unconditionally increment deliveryCount when
MessageAcquireMode.NOT_ACQUIRED
Posted by kw...@apache.org.
QPID-8098: [Broker-J] [AMQP 0-10] No longer unconditionally increment deliveryCount when MessageAcquireMode.NOT_ACQUIRED
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a6408e15
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a6408e15
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a6408e15
Branch: refs/heads/master
Commit: a6408e156a717e8e91aa95e3220ed300c71c52bd
Parents: 7851928
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:50:42 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 15:50:42 2018 +0000
----------------------------------------------------------------------
.../protocol/v0_10/ConsumerTarget_0_10.java | 128 ++++++++++++++++---
...ExplicitAcceptDispositionChangeListener.java | 73 -----------
...ImplicitAcceptDispositionChangeListener.java | 80 ------------
.../server/protocol/v0_10/ServerSession.java | 6 +-
4 files changed, 110 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index 21156de..8d42d2b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -305,6 +305,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
_postIdSettingAction.setXfr(xfr);
+ _postIdSettingAction.setAction(null);
+
+ if (_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ entry.incrementDeliveryCount();
+ }
+
if(_acceptMode == MessageAcceptMode.EXPLICIT)
{
_postIdSettingAction.setAction(new ExplicitAcceptDispositionChangeListener(entry, this, consumer));
@@ -313,11 +320,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
{
_postIdSettingAction.setAction(new ImplicitAcceptDispositionChangeListener(entry, this, consumer));
}
- else
- {
- _postIdSettingAction.setAction(null);
- }
-
_session.sendMessage(xfr, _postIdSettingAction);
xfr.dispose();
@@ -328,7 +330,6 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
_postIdSettingAction.setAction(null);
_postIdSettingAction.setXfr(null);
- entry.incrementDeliveryCount();
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
forceDequeue(entry, false);
@@ -403,27 +404,14 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
void reject(final MessageInstanceConsumer consumer, final MessageInstance entry)
{
- entry.setRedelivered();
if (entry.makeAcquisitionUnstealable(consumer))
{
entry.routeToAlternate(null, null);
}
}
- void release(final MessageInstanceConsumer consumer,
- final MessageInstance entry,
- final boolean setRedelivered)
+ void release(final MessageInstanceConsumer consumer, final MessageInstance entry)
{
- if (setRedelivered)
- {
- entry.setRedelivered();
- }
-
- if (getSession().isClosing() || !setRedelivered)
- {
- entry.decrementDeliveryCount();
- }
-
if (isMaxDeliveryLimitReached(entry))
{
sendToDLQOrDiscard(consumer, entry);
@@ -434,7 +422,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
}
}
- protected void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
+ private void sendToDLQOrDiscard(final MessageInstanceConsumer consumer, MessageInstance entry)
{
final ServerMessage msg = entry.getMessage();
@@ -625,4 +613,102 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
}
}
+ static abstract class AbstractDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
+ {
+ final MessageInstance _entry;
+ final ConsumerTarget_0_10 _target;
+ final MessageInstanceConsumer _consumer;
+
+ AbstractDispositionChangeListener(final MessageInstance entry,
+ final ConsumerTarget_0_10 target,
+ final MessageInstanceConsumer consumer)
+ {
+ _entry = entry;
+ _target = target;
+ _consumer = consumer;
+ }
+
+ @Override
+ public final void onRelease(boolean setRedelivered, final boolean closing)
+ {
+ _target.release(_consumer, _entry);
+
+ if (setRedelivered)
+ {
+ _entry.setRedelivered();
+ }
+
+ if (closing || !setRedelivered)
+ {
+ _entry.decrementDeliveryCount();
+ }
+ }
+
+ @Override
+ public final void onReject()
+ {
+ _entry.setRedelivered();
+ _target.reject(_consumer, _entry);
+ }
+ }
+
+ static class ImplicitAcceptDispositionChangeListener extends AbstractDispositionChangeListener
+ {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
+
+
+ ImplicitAcceptDispositionChangeListener(final MessageInstance entry,
+ final ConsumerTarget_0_10 target,
+ final MessageInstanceConsumer consumer)
+ {
+ super(entry, target, consumer);
+ }
+
+ @Override
+ public void onAccept()
+ {
+ LOGGER.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
+ }
+
+ @Override
+ public boolean acquire()
+ {
+ boolean acquired = _entry.acquire(_consumer);
+ if(acquired)
+ {
+ _entry.incrementDeliveryCount();
+ _target.addUnacknowledgedMessage(_entry);
+ }
+ return acquired;
+ }
+ }
+
+ static class ExplicitAcceptDispositionChangeListener extends AbstractDispositionChangeListener
+ {
+
+ ExplicitAcceptDispositionChangeListener(MessageInstance entry,
+ ConsumerTarget_0_10 target,
+ final MessageInstanceConsumer consumer)
+ {
+ super(entry, target, consumer);
+ }
+
+ @Override
+ public void onAccept()
+ {
+ _target.acknowledge(_consumer, _entry);
+ }
+ @Override
+ public boolean acquire()
+ {
+ final boolean acquired = _entry.acquire(_consumer);
+ if (acquired)
+ {
+ _entry.incrementDeliveryCount();
+ }
+ return acquired;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index 427355b..0000000
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-
-class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(ExplicitAcceptDispositionChangeListener.class);
-
-
- private final MessageInstance _entry;
- private final ConsumerTarget_0_10 _target;
- private final MessageInstanceConsumer _consumer;
-
- public ExplicitAcceptDispositionChangeListener(MessageInstance entry,
- ConsumerTarget_0_10 target,
- final MessageInstanceConsumer consumer)
- {
- _entry = entry;
- _target = target;
- _consumer = consumer;
- }
-
- @Override
- public void onAccept()
- {
- _target.acknowledge(_consumer, _entry);
- }
-
- @Override
- public void onRelease(boolean setRedelivered)
- {
- _target.release(_consumer, _entry, setRedelivered);
- }
-
- @Override
- public void onReject()
- {
- _target.reject(_consumer, _entry);
- }
-
- @Override
- public boolean acquire()
- {
- return _entry.acquire(_consumer);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
deleted file mode 100755
index bcfa205..0000000
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_10;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-
-class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDispositionChangeListener
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(ImplicitAcceptDispositionChangeListener.class);
-
-
- private final MessageInstance _entry;
- private final ConsumerTarget_0_10 _target;
- private final MessageInstanceConsumer _consumer;
-
- public ImplicitAcceptDispositionChangeListener(MessageInstance entry,
- ConsumerTarget_0_10 target,
- final MessageInstanceConsumer consumer)
- {
- _entry = entry;
- _target = target;
- _consumer = consumer;
- }
-
- @Override
- public void onAccept()
- {
- LOGGER.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
- }
-
- @Override
- public void onRelease(boolean setRedelivered)
- {
- _target.release(_consumer, _entry, setRedelivered);
-
- }
-
- @Override
- public void onReject()
- {
- _target.reject(_consumer, _entry);
- }
-
- @Override
- public boolean acquire()
- {
- boolean acquired = _entry.acquire(_consumer);
- if(acquired)
- {
- _target.addUnacknowledgedMessage(_entry);
- }
- return acquired;
-
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a6408e15/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 32a2474..22d539b 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -860,7 +860,7 @@ public class ServerSession extends SessionInvoker
{
void onAccept();
- void onRelease(boolean setRedelivered);
+ void onRelease(boolean setRedelivered, final boolean closing);
void onReject();
@@ -1012,7 +1012,7 @@ public class ServerSession extends SessionInvoker
@Override
public void performAction(MessageDispositionChangeListener listener)
{
- listener.onRelease(setRedelivered);
+ listener.onRelease(setRedelivered, false);
}
});
}
@@ -1158,7 +1158,7 @@ public class ServerSession extends SessionInvoker
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
- listener.onRelease(true);
+ listener.onRelease(true, true);
}
_messageDispositionListenerMap.clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-10] Add
a protocol test supporting explicit message acquision
Posted by kw...@apache.org.
QPID-8038: [Broker-J] [AMQP 0-10] Add a protocol test supporting explicit message acquision
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/a827841b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/a827841b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/a827841b
Branch: refs/heads/master
Commit: a827841b24b22cd21f0ca6523da00093f2a26669
Parents: a6408e1
Author: Keith Wall <kw...@apache.org>
Authored: Wed Feb 14 15:53:20 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Feb 14 15:53:20 2018 +0000
----------------------------------------------------------------------
.../protocol/v0_10/MessageInteraction.java | 20 ++++++
.../qpid/tests/protocol/v0_10/MessageTest.java | 71 ++++++++++++++++++++
2 files changed, 91 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a827841b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
index 54ecf50..43bd2ec 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -25,6 +25,7 @@ import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
import org.apache.qpid.server.protocol.v0_10.transport.Header;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquire;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCancel;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
@@ -42,6 +43,7 @@ public class MessageInteraction
private MessageCancel _cancel;
private MessageFlow _flow;
private MessageAccept _accept;
+ private MessageAcquire _acquire;
public MessageInteraction(final Interaction interaction)
{
@@ -51,6 +53,7 @@ public class MessageInteraction
_cancel = new MessageCancel();
_flow = new MessageFlow();
_accept = new MessageAccept();
+ _acquire = new MessageAcquire();
}
public MessageInteraction transferId(final int id)
@@ -199,4 +202,21 @@ public class MessageInteraction
_accept.setTransfers(transfers);
return this;
}
+
+ public Interaction acquire() throws Exception
+ {
+ return _interaction.sendPerformative(_acquire);
+ }
+
+ public MessageInteraction acquireId(final int id)
+ {
+ _acquire.setId(id);
+ return this;
+ }
+
+ public MessageInteraction acquireTransfers(final RangeSet transfers)
+ {
+ _acquire.setTransfers(transfers);
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/a827841b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 4088b9b..6747451 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -35,6 +35,8 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.Acquired;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionResult;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
@@ -235,6 +237,75 @@ public class MessageTest extends BrokerAdminUsingTestBase
}
}
+ @Test
+ @SpecificationTest(section = "10.message.acquire",
+ description = "Acquires previously transferred messages for consumption. The acquired ids (if any) are "
+ + "sent via message.acquired.")
+ public void acquireTransfer() throws Exception
+ {
+ String testMessageBody = "testMessage";
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "testSession".getBytes(UTF_8);
+ final String subscriberName = "testSubscriber";
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .message()
+ .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+ .subscribeAcquireMode(MessageAcquireMode.NOT_ACQUIRED)
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .message()
+ .flowId(1)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.MESSAGE)
+ .flowValue(1)
+ .flow()
+ .message()
+ .flowId(2)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.BYTE)
+ .flowValue(-1)
+ .flow();
+
+ MessageTransfer transfer = consumeResponse(interaction,
+ MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class,
+ SessionFlush.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+ RangeSet transfers = Range.newInstance(transfer.getId());
+ final ExecutionResult result = interaction.message().acquireId(3).acquireTransfers(transfers).acquire()
+ .consumeResponse(SessionFlush.class)
+ .consumeResponse().getLatestResponse(ExecutionResult.class);
+ final Acquired acquired = (Acquired) result.getValue();
+ assertThat(acquired.getTransfers().includes(transfer.getId()), is(equalTo(true)));
+
+ interaction.message().acceptId(4).acceptTransfers(transfers).accept()
+ .session().flushCompleted()
+ .flush();
+
+ SessionCompleted completed = consumeResponse(interaction,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class,
+ SessionFlush.class);
+
+ assertThat(completed.getCommands(), is(notNullValue()));
+ assertThat(completed.getCommands().includes(4), is(equalTo(true)));
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
private <T extends Method> T consumeResponse(final Interaction interaction,
final Class<T> expected,
final Class<? extends Method>... ignore)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org