You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/07/03 15:13:59 UTC
qpid-broker-j git commit: QPID-7845: Temporarily report resuming link
functionality as unsupported and clean up unsettled deliveries on detach
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 737c52807 -> b2abdd175
QPID-7845: Temporarily report resuming link functionality as unsupported and clean up unsettled deliveries on detach
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b2abdd17
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b2abdd17
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b2abdd17
Branch: refs/heads/master
Commit: b2abdd175e2bdae1f59defd4256c8fc60355111d
Parents: 737c528
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jul 3 16:13:42 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jul 3 16:13:42 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AbstractLinkEndpoint.java | 10 ++++-
.../protocol/v1_0/SendingLinkEndpoint.java | 42 +++++++++++++-------
.../v1_0/StandardReceivingLinkEndpoint.java | 7 +++-
.../protocol/v1_0/messaging/TransferTest.java | 2 +-
.../transport/link/ResumeDeliveriesTest.java | 6 +++
5 files changed, 48 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 5410766..6664600 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -118,7 +118,15 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
}
else if (attach.getUnsettled() != null)
{
- resumeLink(attach);
+ // TODO: QPID-7845 : Functionality for resuming links is not fully implemented
+ if (attach.getUnsettled().isEmpty())
+ {
+ resumeLink(attach);
+ }
+ else
+ {
+ throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Resuming link is not implemented."));
+ }
}
else
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index a78f7f0..0b36c14 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -477,25 +477,16 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
getConsumerTarget().close();
+ // TODO: QPID-7845 : Resuming links is unsupported at the moment. Thus, cleaning up unsettled deliveries unconditionally.
+ cleanUpUnsettledDeliveries();
+
TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy();
if (Boolean.TRUE.equals(detach.getClosed())
|| TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
- || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+ || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
|| (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
- Modified state = new Modified();
- state.setDeliveryFailed(true);
-
- for (OutgoingDelivery delivery : _unsettled.values())
- {
- UnsettledAction action = delivery.getAction();
- if (action != null)
- {
- action.process(state, Boolean.TRUE);
- delivery.setAction(null);
- }
- }
Error closingError = null;
if (getDestination() instanceof ExchangeDestination
@@ -531,10 +522,30 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
else
{
detach();
+
+ // TODO: QPID-7845 : Resuming links is unsupported at the moment. Destroying link unconditionally.
+ destroy();
+
getConsumerTarget().updateNotifyWorkDesired();
}
}
+ private void cleanUpUnsettledDeliveries()
+ {
+ Modified state = new Modified();
+ state.setDeliveryFailed(true);
+
+ for (OutgoingDelivery delivery : _unsettled.values())
+ {
+ UnsettledAction action = delivery.getAction();
+ if (action != null)
+ {
+ action.process(state, Boolean.TRUE);
+ delivery.setAction(null);
+ }
+ }
+ }
+
void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance messageInstance)
{
_unsettled.put(tag, new OutgoingDelivery(messageInstance, unsettledAction, null));
@@ -543,10 +554,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
@Override
protected void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- UnsettledAction action = _unsettled.get(deliveryTag).getAction();
+ OutgoingDelivery outgoingDelivery = _unsettled.get(deliveryTag);
boolean localSettle = false;
- if(action != null)
+ if(outgoingDelivery != null && outgoingDelivery.getAction() != null)
{
+ UnsettledAction action = outgoingDelivery.getAction();
localSettle = action.process(state, settled);
if(localSettle && !Boolean.TRUE.equals(settled))
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 447e7cb..22c49b1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -105,7 +105,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
ServerMessage<?> serverMessage;
UnsignedInteger messageFormat = delivery.getMessageFormat();
- org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = delivery.getState();
+ DeliveryState xfrState = delivery.getState();
List<QpidByteBuffer> fragments = delivery.getPayload();
MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
if(format != null)
@@ -274,7 +274,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
final TerminusExpiryPolicy expiryPolicy = getTarget().getExpiryPolicy();
if((detach != null && Boolean.TRUE.equals(detach.getClosed()))
|| TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
- || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+ || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
|| (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
close();
@@ -287,6 +287,9 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
else
{
detach();
+
+ // TODO: QPID-7845: Resuming links is unsupported at the moment. Destroying link unconditionally.
+ destroy();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 85baddf..1bfe7c0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -507,7 +507,7 @@ public class TransferTest extends ProtocolTestBase
.flow();
Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
- assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+ assertThat(data, is(equalTo(TEST_MESSAGE_DATA)));
interaction.dispositionSettled(false)
.dispositionRole(Role.RECEIVER)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index e85cbc3..6124ae9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.typeCompatibleWith;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
+import static org.junit.Assume.assumeTrue;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
@@ -104,6 +105,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
}
}
+ @Ignore("QPID-7845")
@Test
@SpecificationTest(section = "2.6.13",
description = "When a suspended link having unsettled deliveries is resumed, the unsettled field from the"
@@ -162,6 +164,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
}
}
+ @Ignore("QPID-7845")
@Test
@SpecificationTest(section = "2.7.3",
description = "If the local unsettled map is too large to be encoded within a frame of the agreed maximum"
@@ -230,6 +233,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
final Error error = responseEnd.getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition().getValue(), is(equalTo(AmqpError.FRAME_SIZE_TOO_SMALL)));
+ assumeTrue("Broker does not support incomplete unsettled", false);
}
else if (latestResponse.getBody() instanceof Attach)
{
@@ -252,6 +256,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
}
}
+ @Ignore("QPID-7845")
@Test
@SpecificationTest(section = "2.7.3", description =
"If set to true [incomplete-unsettled] indicates that the unsettled map provided is not complete. "
@@ -505,6 +510,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
}
}
+ @Ignore("QPID-7845")
@Test
@SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+ " the unsettled field from the attach frame will carry"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org