You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/07/13 18:57:36 UTC

qpid-proton git commit: PROTON-944: add ability to set a default state for use when settling/freeing received deliveries without having previously set/sent dispositon state for them

Repository: qpid-proton
Updated Branches:
  refs/heads/master 6d873ebed -> b67a2a943


PROTON-944: add ability to set a default state for use when settling/freeing received deliveries without having previously set/sent dispositon state for them


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b67a2a94
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b67a2a94
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b67a2a94

Branch: refs/heads/master
Commit: b67a2a943017910bcf8bf67a05aafed93ab7b8b1
Parents: 6d873eb
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Jul 13 17:28:31 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Jul 13 17:55:49 2015 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/engine/Delivery.java |  10 +
 .../qpid/proton/engine/impl/DeliveryImpl.java   |  14 +
 .../qpid/proton/engine/impl/TransportImpl.java  |  14 +-
 .../systemtests/DefaultDeliveryStateTest.java   | 288 +++++++++++++++++++
 4 files changed, 323 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index d08efc2..71932ed 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -108,4 +108,14 @@ public interface Delivery extends Extendable
 
     public boolean isBuffered();
 
+    /**
+     * Configures a default DeliveryState to be used if a
+     * received delivery is settled/freed without any disposition
+     * state having been previously applied.
+     *
+     * @param state the default delivery state
+     */
+    public void setDefaultDeliveryState(DeliveryState state);
+
+    public DeliveryState getDefaultDeliveryState();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 3136d7a..896d8a9 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -49,6 +49,7 @@ public class DeliveryImpl implements Delivery
     private boolean _settled;
     private boolean _remoteSettled;
     private DeliveryState _remoteDeliveryState;
+    private DeliveryState _defaultDeliveryState = null;
 
     private static final int DELIVERY_STATE_CHANGED = 1;
     private static final int ABLE_TO_SEND = 2;
@@ -420,6 +421,7 @@ public class DeliveryImpl implements Delivery
             .append(", _remoteSettled=").append(_remoteSettled)
             .append(", _remoteDeliveryState=").append(_remoteDeliveryState)
             .append(", _flags=").append(_flags)
+            .append(", _defaultDeliveryState=").append(_defaultDeliveryState)
             .append(", _transportDelivery=").append(_transportDelivery)
             .append(", _dataSize=").append(_dataSize)
             .append(", _complete=").append(_complete)
@@ -434,4 +436,16 @@ public class DeliveryImpl implements Delivery
         return _dataSize;
     }
 
+    @Override
+    public void setDefaultDeliveryState(DeliveryState state)
+    {
+        _defaultDeliveryState = state;
+    }
+
+    @Override
+    public DeliveryState getDefaultDeliveryState()
+    {
+        return _defaultDeliveryState;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 9cc73ce..0902a55 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
 import org.apache.qpid.proton.amqp.transport.Close;
 import org.apache.qpid.proton.amqp.transport.ConnectionError;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.Detach;
 import org.apache.qpid.proton.amqp.transport.Disposition;
 import org.apache.qpid.proton.amqp.transport.End;
@@ -640,15 +641,22 @@ public class TransportImpl extends EndpointImpl
 
         if (tpSession.isLocalChannelSet())
         {
+            boolean settled = delivery.isSettled();
+            DeliveryState localState = delivery.getLocalState();
+
             Disposition disposition = new Disposition();
             disposition.setFirst(tpDelivery.getDeliveryId());
             disposition.setLast(tpDelivery.getDeliveryId());
             disposition.setRole(Role.RECEIVER);
-            disposition.setSettled(delivery.isSettled());
+            disposition.setSettled(settled);
+            disposition.setState(localState);
+
+            if(localState == null && settled) {
+                disposition.setState(delivery.getDefaultDeliveryState());
+            }
 
-            disposition.setState(delivery.getLocalState());
             writeFrame(tpSession.getLocalChannel(), disposition, null, null);
-            if (delivery.isSettled())
+            if (settled)
             {
                 tpDelivery.settled();
             }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b67a2a94/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java
new file mode 100644
index 0000000..3ad9d3a
--- /dev/null
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/DefaultDeliveryStateTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.systemtests;
+
+import static java.util.EnumSet.of;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
+import static org.apache.qpid.proton.engine.EndpointState.CLOSED;
+import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
+import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
+import static org.junit.Assert.assertNull;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Released;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
+import org.junit.Test;
+
+
+public class DefaultDeliveryStateTest extends EngineTestBase
+{
+    private static final Logger LOGGER = Logger.getLogger(DefaultDeliveryStateTest.class.getName());
+
+    private static final int BUFFER_SIZE = 4096;
+
+    private final String _sourceAddress = getServer().containerId + "-link1-source";
+
+    @Test
+    public void testDefaultDeliveryState() throws Exception
+    {
+        LOGGER.fine(bold("======== About to create transports"));
+
+        getClient().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+        getServer().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
+
+        doOutputInputCycle();
+
+        getClient().connection = Proton.connection();
+        getClient().transport.bind(getClient().connection);
+
+        getServer().connection = Proton.connection();
+        getServer().transport.bind(getServer().connection);
+
+
+
+        LOGGER.fine(bold("======== About to open connections"));
+        getClient().connection.open();
+        getServer().connection.open();
+
+        doOutputInputCycle();
+
+
+
+        LOGGER.fine(bold("======== About to open sessions"));
+        getClient().session = getClient().connection.session();
+        getClient().session.open();
+
+        pumpClientToServer();
+
+        getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+        getServer().session.open();
+        assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+
+        LOGGER.fine(bold("======== About to create reciever"));
+
+        getClient().source = new Source();
+        getClient().source.setAddress(_sourceAddress);
+
+        getClient().target = new Target();
+        getClient().target.setAddress(null);
+
+        getClient().receiver = getClient().session.receiver("link1");
+        getClient().receiver.setTarget(getClient().target);
+        getClient().receiver.setSource(getClient().source);
+
+        getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+        assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+        getClient().receiver.open();
+        assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+
+        LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+        getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+        getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+        getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+        getServer().sender.setSource(serverRemoteSource);
+
+        assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+        getServer().sender.open();
+
+        assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+
+        assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+        int messagCount = 3;
+        getClient().receiver.flow(messagCount);
+
+        pumpClientToServer();
+
+        LOGGER.fine(bold("======== About to create messages and send to the client"));
+
+        Delivery serverDelivery1 = sendMessageToClient("delivery1", "Msg1");
+        Delivery serverDelivery2 = sendMessageToClient("delivery2", "Msg2");
+        Delivery serverDelivery3 = sendMessageToClient("delivery3", "Msg3");
+
+        pumpServerToClient();
+
+        assertNull(serverDelivery1.getLocalState());
+        assertNull(serverDelivery2.getLocalState());
+        assertNull(serverDelivery3.getLocalState());
+
+        assertNull(serverDelivery1.getRemoteState());
+        assertNull(serverDelivery2.getRemoteState());
+        assertNull(serverDelivery3.getRemoteState());
+
+        LOGGER.fine(bold("======== About to process the messages on the client"));
+
+        Delivery clientDelivery1 = receiveMessageFromServer("delivery1", "Msg1");
+        Delivery clientDelivery2 = receiveMessageFromServer("delivery2", "Msg2");
+        Delivery clientDelivery3 = receiveMessageFromServer("delivery3", "Msg3");
+
+        // Give them some default state
+        clientDelivery1.setDefaultDeliveryState(Released.getInstance());
+        clientDelivery2.setDefaultDeliveryState(Released.getInstance());
+        clientDelivery3.setDefaultDeliveryState(Released.getInstance());
+
+        assertEquals(Released.getInstance(), clientDelivery1.getDefaultDeliveryState());
+        assertEquals(Released.getInstance(), clientDelivery2.getDefaultDeliveryState());
+        assertEquals(Released.getInstance(), clientDelivery3.getDefaultDeliveryState());
+
+        // Check the default state doesn't influence the actual state
+        assertNull(clientDelivery1.getLocalState());
+        assertNull(clientDelivery2.getLocalState());
+        assertNull(clientDelivery3.getLocalState());
+
+        assertNull(clientDelivery1.getRemoteState());
+        assertNull(clientDelivery2.getRemoteState());
+        assertNull(clientDelivery3.getRemoteState());
+
+        // Accept one for real, update default on another, leave last untouched
+        clientDelivery1.disposition(Accepted.getInstance());
+        clientDelivery2.setDefaultDeliveryState(new Modified());
+
+        // Confirm default and actual states have or have not changed as expected
+        assertEquals(Released.getInstance(), clientDelivery1.getDefaultDeliveryState());
+        assertTrue(clientDelivery2.getDefaultDeliveryState() instanceof Modified);
+        assertEquals(Released.getInstance(), clientDelivery3.getDefaultDeliveryState());
+
+        assertEquals(Accepted.getInstance(), clientDelivery1.getLocalState());
+        assertNull(clientDelivery2.getLocalState());
+        assertNull(clientDelivery3.getLocalState());
+
+        // Verify the server gets intended state changes
+        pumpClientToServer();
+
+        assertEquals(Accepted.getInstance(), serverDelivery1.getRemoteState());
+        assertNull(serverDelivery2.getRemoteState());
+        assertNull(serverDelivery3.getRemoteState());
+
+        // Confirm server sees the default states for second and third
+        // messages when they get settled during link free
+        getClient().receiver.close();
+        assertEndpointState(getClient().receiver, CLOSED, ACTIVE);
+
+        pumpClientToServer();
+
+        assertEndpointState(getServer().sender, ACTIVE, CLOSED);
+        getServer().sender.close();
+
+        assertEndpointState(getServer().sender, CLOSED, CLOSED);
+
+        pumpServerToClient();
+
+        getClient().receiver.free();
+
+        assertEndpointState(getClient().receiver, CLOSED, CLOSED);
+
+        pumpClientToServer();
+
+        assertEquals(Accepted.getInstance(), serverDelivery1.getRemoteState());
+        assertTrue(serverDelivery2.getRemoteState() instanceof Modified);
+        assertEquals(Released.getInstance(), serverDelivery3.getRemoteState());
+    }
+
+    private Delivery receiveMessageFromServer(String deliveryTag, String messageContent)
+    {
+        Delivery delivery = getClient().connection.getWorkHead();
+
+        assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+        assertEquals("The received delivery should be on our receiver",
+                            getClient().receiver, delivery.getLink());
+
+        assertNull(delivery.getLocalState());
+        assertNull(delivery.getRemoteState());
+
+        assertFalse(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[BUFFER_SIZE];
+        int len = getClient().receiver.recv(received, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Message m = Proton.message();
+        m.decode(received, 0, len);
+
+        Object messageBody = ((AmqpValue)m.getBody()).getValue();
+        assertEquals("Unexpected message content", messageContent, messageBody);
+
+        boolean receiverAdvanced = getClient().receiver.advance();
+        assertTrue("receiver has not advanced", receiverAdvanced);
+
+        return delivery;
+    }
+
+    private Delivery sendMessageToClient(String deliveryTag, String messageBody)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Message m = Proton.message();
+        m.setBody(new AmqpValue(messageBody));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Delivery serverDelivery = getServer().sender.delivery(tag);
+        int sent = getServer().sender.send(encoded, 0, len);
+
+        assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent);
+
+        boolean senderAdvanced = getServer().sender.advance();
+        assertTrue("sender has not advanced", senderAdvanced);
+
+        return serverDelivery;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org