You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/07/03 15:13:59 UTC

qpid-broker-j git commit: QPID-7845: Temporarily report resuming link functionality as unsupported and clean up unsettled deliveries on detach

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 737c52807 -> b2abdd175


QPID-7845: Temporarily report resuming link functionality as unsupported and clean up unsettled deliveries on detach


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b2abdd17
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b2abdd17
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b2abdd17

Branch: refs/heads/master
Commit: b2abdd175e2bdae1f59defd4256c8fc60355111d
Parents: 737c528
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Jul 3 16:13:42 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Jul 3 16:13:42 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     | 10 ++++-
 .../protocol/v1_0/SendingLinkEndpoint.java      | 42 +++++++++++++-------
 .../v1_0/StandardReceivingLinkEndpoint.java     |  7 +++-
 .../protocol/v1_0/messaging/TransferTest.java   |  2 +-
 .../transport/link/ResumeDeliveriesTest.java    |  6 +++
 5 files changed, 48 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 5410766..6664600 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -118,7 +118,15 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         }
         else if (attach.getUnsettled() != null)
         {
-            resumeLink(attach);
+            // TODO: QPID-7845 : Functionality for resuming links is not fully implemented
+            if (attach.getUnsettled().isEmpty())
+            {
+                resumeLink(attach);
+            }
+            else
+            {
+                throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Resuming link is not implemented."));
+            }
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index a78f7f0..0b36c14 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -477,25 +477,16 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
         getConsumerTarget().close();
 
+        // TODO: QPID-7845 : Resuming links is unsupported at the moment. Thus, cleaning up unsettled deliveries unconditionally.
+        cleanUpUnsettledDeliveries();
+
         TerminusExpiryPolicy expiryPolicy = (getSource()).getExpiryPolicy();
         if (Boolean.TRUE.equals(detach.getClosed())
             || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
-            || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+            || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
             || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
 
-            Modified state = new Modified();
-            state.setDeliveryFailed(true);
-
-            for (OutgoingDelivery delivery : _unsettled.values())
-            {
-                UnsettledAction action = delivery.getAction();
-                if (action != null)
-                {
-                    action.process(state, Boolean.TRUE);
-                    delivery.setAction(null);
-                }
-            }
 
             Error closingError = null;
             if (getDestination() instanceof ExchangeDestination
@@ -531,10 +522,30 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         else
         {
             detach();
+
+            // TODO: QPID-7845 : Resuming links is unsupported at the moment. Destroying link unconditionally.
+            destroy();
+
             getConsumerTarget().updateNotifyWorkDesired();
         }
     }
 
+    private void cleanUpUnsettledDeliveries()
+    {
+        Modified state = new Modified();
+        state.setDeliveryFailed(true);
+
+        for (OutgoingDelivery delivery : _unsettled.values())
+        {
+            UnsettledAction action = delivery.getAction();
+            if (action != null)
+            {
+                action.process(state, Boolean.TRUE);
+                delivery.setAction(null);
+            }
+        }
+    }
+
     void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance messageInstance)
     {
         _unsettled.put(tag, new OutgoingDelivery(messageInstance, unsettledAction, null));
@@ -543,10 +554,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     @Override
     protected void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
-        UnsettledAction action = _unsettled.get(deliveryTag).getAction();
+        OutgoingDelivery outgoingDelivery = _unsettled.get(deliveryTag);
         boolean localSettle = false;
-        if(action != null)
+        if(outgoingDelivery != null && outgoingDelivery.getAction() != null)
         {
+            UnsettledAction action = outgoingDelivery.getAction();
             localSettle = action.process(state, settled);
             if(localSettle && !Boolean.TRUE.equals(settled))
             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 447e7cb..22c49b1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -105,7 +105,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         {
             ServerMessage<?> serverMessage;
             UnsignedInteger messageFormat = delivery.getMessageFormat();
-            org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = delivery.getState();
+            DeliveryState xfrState = delivery.getState();
             List<QpidByteBuffer> fragments = delivery.getPayload();
             MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
             if(format != null)
@@ -274,7 +274,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         final TerminusExpiryPolicy expiryPolicy = getTarget().getExpiryPolicy();
         if((detach != null && Boolean.TRUE.equals(detach.getClosed()))
            || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
-           || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+           || ((expiryPolicy == null || TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy)) && getSession().isClosing())
            || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
             close();
@@ -287,6 +287,9 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         else
         {
             detach();
+
+            // TODO: QPID-7845: Resuming links is unsupported at the moment. Destroying link unconditionally.
+            destroy();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 85baddf..1bfe7c0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -507,7 +507,7 @@ public class TransferTest extends ProtocolTestBase
                                                      .flow();
 
             Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
-            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+            assertThat(data, is(equalTo(TEST_MESSAGE_DATA)));
 
             interaction.dispositionSettled(false)
                        .dispositionRole(Role.RECEIVER)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b2abdd17/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index e85cbc3..6124ae9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.typeCompatibleWith;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
+import static org.junit.Assume.assumeTrue;
 
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
@@ -104,6 +105,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
         }
     }
 
+    @Ignore("QPID-7845")
     @Test
     @SpecificationTest(section = "2.6.13",
             description = "When a suspended link having unsettled deliveries is resumed, the unsettled field from the"
@@ -162,6 +164,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
         }
     }
 
+    @Ignore("QPID-7845")
     @Test
     @SpecificationTest(section = "2.7.3",
             description = "If the local unsettled map is too large to be encoded within a frame of the agreed maximum"
@@ -230,6 +233,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
                 final Error error = responseEnd.getError();
                 assertThat(error, is(notNullValue()));
                 assertThat(error.getCondition().getValue(), is(equalTo(AmqpError.FRAME_SIZE_TOO_SMALL)));
+                assumeTrue("Broker does not support incomplete unsettled",  false);
             }
             else if (latestResponse.getBody() instanceof Attach)
             {
@@ -252,6 +256,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
         }
     }
 
+    @Ignore("QPID-7845")
     @Test
     @SpecificationTest(section = "2.7.3", description =
             "If set to true [incomplete-unsettled] indicates that the unsettled map provided is not complete. "
@@ -505,6 +510,7 @@ public class ResumeDeliveriesTest extends ProtocolTestBase
         }
     }
 
+    @Ignore("QPID-7845")
     @Test
     @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
                                                          + " the unsettled field from the attach frame will carry"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org