You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/18 21:10:31 UTC

[qpid-protonj2] branch main updated: PROTON-2547 Add nextReceiver API to session and connection

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 62e0584b PROTON-2547 Add nextReceiver API to session and connection
62e0584b is described below

commit 62e0584b6697de1d57f06f07eb2c4c94e89d634a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 18 16:43:22 2022 -0400

    PROTON-2547 Add nextReceiver API to session and connection
    
    Allow a user to poll a session or the base connection's default session
    for the next receiver that has a delivery or wait for a delivery to
    arrive for any of the session receivers.
---
 .../protonj2/client/examples/NextReceiver.java     |  68 ++
 .../apache/qpid/protonj2/client/Connection.java    |  66 ++
 .../qpid/protonj2/client/ConnectionOptions.java    |  24 +
 .../qpid/protonj2/client/NextReceiverPolicy.java   |  78 ++
 .../org/apache/qpid/protonj2/client/Session.java   |  67 ++
 .../qpid/protonj2/client/SessionOptions.java       |  23 +
 .../protonj2/client/impl/ClientConnection.java     |  25 +
 .../qpid/protonj2/client/impl/ClientLinkType.java  |   8 +-
 .../client/impl/ClientNextReceiverSelector.java    | 207 +++++
 .../qpid/protonj2/client/impl/ClientReceiver.java  |  34 +-
 .../client/impl/ClientReceiverLinkType.java        |   1 +
 .../qpid/protonj2/client/impl/ClientSession.java   |  53 +-
 .../protonj2/client/impl/ClientSessionBuilder.java |   1 +
 .../qpid/protonj2/client/impl/ConnectionTest.java  | 138 +++
 .../qpid/protonj2/client/impl/ReceiverTest.java    | 234 +++--
 .../protonj2/client/impl/ReconnectSessionTest.java | 100 +++
 .../qpid/protonj2/client/impl/SessionTest.java     | 990 +++++++++++++++++++++
 .../org/apache/qpid/protonj2/engine/Session.java   |  19 +-
 .../protonj2/engine/impl/ProtonConnection.java     |   9 +-
 .../qpid/protonj2/engine/impl/ProtonReceiver.java  |   6 +
 .../qpid/protonj2/engine/impl/ProtonSession.java   |  22 +-
 .../protonj2/engine/impl/ProtonSessionTest.java    |  37 +
 22 files changed, 2059 insertions(+), 151 deletions(-)

diff --git a/protonj2-client-examples/src/main/java/org/apache/qpid/protonj2/client/examples/NextReceiver.java b/protonj2-client-examples/src/main/java/org/apache/qpid/protonj2/client/examples/NextReceiver.java
new file mode 100644
index 00000000..3282802d
--- /dev/null
+++ b/protonj2-client-examples/src/main/java/org/apache/qpid/protonj2/client/examples/NextReceiver.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protonj2.client.examples;
+
+import java.util.concurrent.ForkJoinPool;
+
+import org.apache.qpid.protonj2.client.Client;
+import org.apache.qpid.protonj2.client.Connection;
+import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Delivery;
+import org.apache.qpid.protonj2.client.Message;
+
+public class NextReceiver {
+
+    public static void main(String[] args) throws Exception {
+        final String serverHost = System.getProperty("HOST", "localhost");
+        final int serverPort = Integer.getInteger("PORT", 5672);
+        final String address1 = System.getProperty("ADDRESS1", "next-receiver-1-address");
+        final String address2 = System.getProperty("ADDRESS2", "next-receiver-2-address");
+
+        final Client client = Client.create();
+
+        final ConnectionOptions options = new ConnectionOptions();
+        options.user(System.getProperty("USER"));
+        options.password(System.getProperty("PASSWORD"));
+
+        try (Connection connection = client.connect(serverHost, serverPort, options)) {
+
+            connection.openReceiver(address1);
+            connection.openReceiver(address2);
+
+            ForkJoinPool.commonPool().execute(() -> {
+                try {
+                    Thread.sleep(2000);
+                    connection.send(Message.create("Hello World 1").to(address1));
+                    Thread.sleep(2000);
+                    connection.send(Message.create("Hello World 2").to(address2));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            });
+
+            final Delivery delivery1 = connection.nextReceiver().receive();
+            final Delivery delivery2 = connection.nextReceiver().receive();
+
+            System.out.println("Received first message with body: " + delivery1.message().body());
+            System.out.println("Received second message with body: " + delivery2.message().body());
+        }
+    }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java
index c8ffe356..2cac3b4e 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java
@@ -18,6 +18,7 @@ package org.apache.qpid.protonj2.client;
 
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
@@ -460,6 +461,71 @@ public interface Connection extends AutoCloseable {
      */
     Tracker send(Message<?> message) throws ClientException;
 
+    /**
+     * Waits indefinitely for a receiver created from the connection default session to have a
+     * delivery ready for receipt. The selection of the next receiver when more than one exists
+     * which has pending deliveries is based upon the configured value of the
+     * {@link ConnectionOptions#defaultNextReceiverPolicy()}.
+     *
+     * @return the next receiver that has a pending delivery available based on policy.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver() throws ClientException;
+
+    /**
+     * Waits indefinitely for a receiver created from the connection default session to have a
+     * delivery ready for receipt. The selection of the next receiver when more than one exists
+     * which has pending deliveries is based upon the value of the {@link NextReceiverPolicy}
+     * that is provided by the caller.
+     *
+     * @param policy
+     *      The policy to apply when selecting the next receiver.
+     *
+     * @return the next receiver that has a pending delivery available based on policy.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException;
+
+    /**
+     * Waits for the configured time interval for a receiver created from the connection default
+     * session to have a delivery ready for receipt. The selection of the next receiver when more
+     * than one exists which has pending deliveries is based upon the configured value of the
+     * {@link ConnectionOptions#defaultNextReceiverPolicy()}. If no receiver has an incoming delivery
+     * before the given timeout expires the method returns null.
+     *
+     * @param timeout
+     *      The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+     * @param unit
+     *      The unit of time that the given timeout represents.
+     *
+     * @return the next receiver that has a pending delivery available based on policy or null if the timeout is reached.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException;
+
+    /**
+     * Waits for the configured time interval for a receiver created from the connection default
+     * session to have a delivery ready for receipt. The selection of the next receiver when more
+     * than one exists which has pending deliveries is based upon the {@link NextReceiverPolicy}
+     * provided by the caller. If no receiver has an incoming delivery before the given timeout
+     * expires the method returns null.
+     *
+     * @param policy
+     *      The policy to apply when selecting the next receiver.
+     * @param timeout
+     *      The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+     * @param unit
+     *      The unit of time that the given timeout represents.
+     *
+     * @return the next receiver that has a pending delivery available based on policy or null if the timeout is reached.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException;
+
     /**
      * Returns the properties that the remote provided upon successfully opening the {@link Connection}.  If the
      * open has not completed yet this method will block to await the open response which carries the remote
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java
index a08105ff..da4bf222 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java
@@ -50,6 +50,7 @@ public class ConnectionOptions {
     public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
     public static final int DEFAULT_CHANNEL_MAX = 65535;
     public static final int DEFAULT_MAX_FRAME_SIZE = 65536;
+    public static final NextReceiverPolicy DEFAULT_NEXT_RECEIVER_POLICY = NextReceiverPolicy.ROUND_ROBIN;
 
     private long sendTimeout = DEFAULT_SEND_TIMEOUT;
     private long requestTimeout = DEFAULT_REQUEST_TIMEOUT;
@@ -72,6 +73,7 @@ public class ConnectionOptions {
     private Map<String, Object> properties;
     private String virtualHost;
     private boolean traceFrames;
+    private NextReceiverPolicy nextReceiverPolicy = DEFAULT_NEXT_RECEIVER_POLICY;
 
     private BiConsumer<Connection, ConnectionEvent> connectedHandler;
     private BiConsumer<Connection, DisconnectionEvent> disconnectedHandler;
@@ -127,6 +129,7 @@ public class ConnectionOptions {
         other.interruptedHandler(interruptedHandler);
         other.reconnectedHandler(reconnectedHandler);
         other.disconnectedHandler(disconnectedHandler);
+        other.defaultNextReceiverPolicy(nextReceiverPolicy);
 
         if (offeredCapabilities != null) {
             other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
@@ -646,6 +649,27 @@ public class ConnectionOptions {
         return this;
     }
 
+    /**
+     * @return the configured default next receiver policy for the connection.
+     */
+    public NextReceiverPolicy defaultNextReceiverPolicy() {
+        return nextReceiverPolicy;
+    }
+
+    /**
+     * Configures the default next receiver policy for this connection and any session
+     * that is created without specifying user defined session default options.
+     *
+     * @param policy
+     * 		The next receiver policy to assign as the default.
+     *
+     * @return this {@link ConnectionOptions} instance.
+     */
+    public ConnectionOptions defaultNextReceiverPolicy(NextReceiverPolicy policy) {
+        this.nextReceiverPolicy = policy;
+        return this;
+    }
+
     /**
      * @return the connection failed handler currently registered.
      */
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/NextReceiverPolicy.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/NextReceiverPolicy.java
new file mode 100644
index 00000000..92c232d3
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/NextReceiverPolicy.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client;
+
+/**
+ * Determines the behavior of a Session when the next receiver method is called
+ * on that session. Each policy provides a contract on the ordering of returned
+ * receivers from the next receiver API when there are receivers with locally
+ * queued deliveries. When there are no {@link Receiver} instances that have
+ * locally queued deliveries the next receive API will return the next receiver
+ * to receive a complete incoming delivery unless a timeout was given and that
+ * time period expires in which case it will return <code>null</code>.
+ * <p>
+ * Should the user perform receive calls on a {@link Receiver} directly in multiple
+ * threads the behavior of the next receiver API is undefined and it becomes possible
+ * that the resulting receiver returned from that API will have no actual pending
+ * deliveries due to a race. In most cases the caller can mitigate some risk by using
+ * the {@link Receiver#tryReceive()} API and accounting for a null result.
+ */
+public enum NextReceiverPolicy {
+
+    /**
+     * Examines the list of currently open receivers in the session and returns
+     * the next receiver that has a pending delivery that follows the previously
+     * returned receiver (if any) otherwise the first receiver in the session with
+     * a pending delivery is returned. The order of receivers returned will likely
+     * be creation order however the implementation is not required to follow this
+     * pattern so the caller should not be coded to rely on that ordering.
+     */
+    ROUND_ROBIN,
+
+    /**
+     * Examines the list of currently open receivers in the session and returns a
+     * random selection from the set of receivers that have a pending delivery
+     * immediately available. This provides a means of selecting receivers which
+     * is not prone to sticking to a highly active receiver which can starve out
+     * other receivers which receive only limited traffic.
+     */
+    RANDOM,
+
+    /**
+     * Examines the list of currently open receivers in the session and returns the
+     * first receiver found with an available delivery. This can result in starvation
+     * if that receiver has a continuous feed of new deliveries from the remote as it
+     * will be repeatedly selected by the next receiver API.
+     */
+    FIRST_AVAILABLE,
+
+    /**
+     * Examines the list of currently open receivers in the session and returns the
+     * receiver with the largest backlog of available deliveries. This can result in
+     * starvation if that receiver has a continuous feed of new deliveries from the
+     * remote as it will likely be repeatedly selected by the next receiver API.
+     */
+    LARGEST_BACKLOG,
+
+    /**
+     * Examines the list of currently open receivers in the session and returns the
+     * receiver with the smallest backlog of available deliveries.
+     */
+    SMALLEST_BACKLOG
+
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java
index 18260da4..568b93df 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java
@@ -18,6 +18,7 @@ package org.apache.qpid.protonj2.client;
 
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientTransactionNotActiveException;
@@ -326,4 +327,70 @@ public interface Session extends AutoCloseable {
      */
     Session rollbackTransaction() throws ClientException;
 
+    /**
+     * Waits indefinitely for a receiver created from this session to have a delivery ready for
+     * receipt. The selection of the next receiver when more than one exits which has pending
+     * deliveries is based upon the configured value of the {@link SessionOptions#defaultNextReceiverPolicy()}
+     * used to create this session or if none was provided then the value is taken from the value
+     * of the {@link ConnectionOptions#defaultNextReceiverPolicy()}.
+     *
+     * @return the next receiver that has a pending delivery available based on policy.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver() throws ClientException;
+
+    /**
+     * Waits indefinitely for a receiver created from this session to have a delivery ready for
+     * receipt. The selection of the next receiver when more than one exits which has pending
+     * deliveries is based upon the value of the {@link NextReceiverPolicy} that is provided by
+     * the caller.
+     *
+     * @param policy
+     *      The policy to apply when selecting the next receiver.
+     *
+     * @return the next receiver that has a pending delivery available based on policy.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException;
+
+    /**
+     * Waits for the given duration for a receiver created from this session to have a delivery ready
+     * for receipt. The selection of the next receiver when more than one exits which has pending
+     * deliveries is based upon the configured value of the {@link SessionOptions#defaultNextReceiverPolicy()}
+     * used to create this session or if none was provided then the value is taken from the value
+     * of the {@link ConnectionOptions#defaultNextReceiverPolicy()}. If no receiver has an available
+     * delivery within the given timeout this method returns null.
+     *
+     * @param timeout
+     *      The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+     * @param unit
+     *      The unit of time that the given timeout represents.
+     *
+     * @return the next receiver that has a pending delivery available based on policy.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException;
+
+    /**
+     * Waits for the given duration for a receiver created from this session to have a delivery ready
+     * for receipt. The selection of the next receiver when more than one exits which has pending
+     * deliveries is based upon the value of the {@link NextReceiverPolicy} provided by the caller. If
+     * no receiver has an available delivery within the given timeout this method returns null.
+     *
+     * @param policy
+     *      The policy to apply when selecting the next receiver.
+     * @param timeout
+     *      The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+     * @param unit
+     *      The unit of time that the given timeout represents.
+     *
+     * @return the next receiver that has a pending delivery available based on policy.
+     *
+     * @throws ClientException if an internal error occurs.
+     */
+    Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException;
+
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java
index a61221c3..dac19d5f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java
@@ -51,6 +51,7 @@ public class SessionOptions {
     private String[] offeredCapabilities;
     private String[] desiredCapabilities;
     private Map<String, Object> properties;
+    private NextReceiverPolicy nextReceiverPolicy = ConnectionOptions.DEFAULT_NEXT_RECEIVER_POLICY;
 
     /**
      * Create a new {@link SessionOptions} instance configured with default configuration settings.
@@ -92,6 +93,7 @@ public class SessionOptions {
         other.requestTimeout(requestTimeout);
         other.incomingCapacity(incomingCapacity);
         other.outgoingCapacity(outgoingCapacity);
+        other.defaultNextReceiverPolicy(nextReceiverPolicy);
 
         if (offeredCapabilities != null) {
             other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
@@ -398,4 +400,25 @@ public class SessionOptions {
         this.outgoingCapacity = outgoingCapacity;
         return this;
     }
+
+    /**
+     * @return the configured default next receiver policy for a session created using these options.
+     */
+    public NextReceiverPolicy defaultNextReceiverPolicy() {
+        return nextReceiverPolicy;
+    }
+
+    /**
+     * Configures the default next receiver policy for the session created with these
+     * configuration options.
+     *
+     * @param policy
+     * 		The default next receiver policy to assign to a new session.
+     *
+     * @return this {@link SessionOptions} instance.
+     */
+    public SessionOptions defaultNextReceiverPolicy(NextReceiverPolicy policy) {
+        this.nextReceiverPolicy = policy;
+        return this;
+    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index c6765dfc..a1546ebe 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -41,6 +41,7 @@ import org.apache.qpid.protonj2.client.ConnectionOptions;
 import org.apache.qpid.protonj2.client.DisconnectionEvent;
 import org.apache.qpid.protonj2.client.ErrorCondition;
 import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
 import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.ReconnectLocation;
@@ -487,6 +488,30 @@ public final class ClientConnection implements Connection {
         return request(this, result).send(message);
     }
 
+    @Override
+    public Receiver nextReceiver() throws ClientException {
+        checkClosedOrFailed();
+        return defaultSession().nextReceiver();
+    }
+
+    @Override
+    public Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException {
+        checkClosedOrFailed();
+        return defaultSession().nextReceiver(timeout, unit);
+    }
+
+    @Override
+    public Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException {
+        checkClosedOrFailed();
+        return defaultSession().nextReceiver(policy);
+    }
+
+    @Override
+    public Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException {
+        checkClosedOrFailed();
+        return defaultSession().nextReceiver(policy, timeout, unit);
+    }
+
     @Override
     public Map<String, Object> properties() throws ClientException {
         waitForOpenToComplete();
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
index 7bb52e0c..852ee616 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -303,7 +303,7 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>,
         if (options.openTimeout() > 0) {
             executor.schedule(() -> {
                 if (!openFuture.isDone()) {
-                    immediateLinkShutdown(new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
+                    immediateLinkShutdown(new ClientOperationTimedOutException("Link open timed out waiting for remote to respond"));
                 }
             }, options.openTimeout(), TimeUnit.MILLISECONDS);
         }
@@ -462,7 +462,8 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>,
 
     protected boolean notClosedOrFailed(ClientFuture<?> request, ProtonType protonLink) {
         if (isClosed()) {
-            request.failed(new ClientIllegalStateException("The Sender was explicitly closed", failureCause));
+            request.failed(new ClientIllegalStateException(
+                String.format("The %s was explicitly closed", protonLink().isReceiver() ? "Receiver" : "Sender"), failureCause));
             return false;
         } else if (failureCause != null) {
             request.failed(failureCause);
@@ -486,7 +487,8 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>,
 
     protected void checkClosedOrFailed() throws ClientException {
         if (isClosed()) {
-            throw new ClientIllegalStateException("The Sender was explicitly closed", failureCause);
+            throw new ClientIllegalStateException(
+                String.format("The %s was explicitly closed", protonLink().isReceiver() ? "Receiver" : "Sender"), failureCause);
         } else if (failureCause != null) {
             throw failureCause;
         }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
new file mode 100644
index 00000000..a6f04b5c
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import java.security.SecureRandom;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
+import org.apache.qpid.protonj2.client.Receiver;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
+import org.apache.qpid.protonj2.client.futures.ClientFuture;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+
+/**
+ * Implements the various strategies of selecting the next receiver with pending
+ * messages and or waiting for new incoming messages.
+ */
+final class ClientNextReceiverSelector {
+
+    private static final String LAST_RETURNED_STATE_KEY = "Last_Returned_State";
+
+    private final ArrayDeque<ClientFuture<Receiver>> pending = new ArrayDeque<>();
+    private final ClientSession session;
+    private SecureRandom srand;
+
+    public ClientNextReceiverSelector(ClientSession session) {
+        this.session = session;
+
+        // Same processing as reconnect for session delivery tapping
+        handleReconnect();
+    }
+
+    public void nextReceiver(ClientFuture<Receiver> request, NextReceiverPolicy policy, long timeout) {
+        Objects.requireNonNull(policy, "The next receiver selection policy cannot be null");
+
+        ClientReceiver result = null;
+
+        switch (policy) {
+            case ROUND_ROBIN:
+                result = selectNextAvailable();
+                break;
+            case FIRST_AVAILABLE:
+                result = selectFirstAvailable();
+                break;
+            case LARGEST_BACKLOG:
+                result = selectLargestBacklog();
+                break;
+            case SMALLEST_BACKLOG:
+                result = selectSmallestBacklog();
+                break;
+            case RANDOM:
+                result = selectRandomReceiver();
+                break;
+            default:
+                request.failed(new ClientException("Next receiver called with invalid or unknown policy:" + policy));
+                break;
+        }
+
+        if (result == null) {
+            pending.add(request); // Wait for the next incoming delivery
+            if (timeout > 0) {
+                session.getScheduler().schedule(() -> {
+                    if (!request.isDone()) {
+                        pending.remove(request);
+                        request.complete(null);
+                    }
+                }, timeout, TimeUnit.MILLISECONDS);
+            }
+        } else {
+            // Track last returned to update state for Round Robin next receiver dispatch
+            // this effectively ties all policies together in updating the next result from
+            // a call that requests the round robin fairness policy.
+            result.protonLink().getSession().getAttachments().set(LAST_RETURNED_STATE_KEY, result);
+
+            request.complete(result);
+        }
+    }
+
+    public void handleReconnect() {
+        session.getProtonSession().deliveryReadHandler(this::deliveryReadHandler);
+    }
+
+    public void handleShutdown() {
+        ClientException cause = null;
+
+        if (session.isClosed()) {
+            cause = new ClientIllegalStateException("The Session was explicitly closed", session.getFailureCause());
+        } else if (session.getFailureCause() != null) {
+            cause = session.getFailureCause();
+        } else {
+            cause = new ClientIllegalStateException("The session was closed without a specific error being provided");
+        }
+
+        for (ClientFuture<Receiver> request : pending) {
+            request.failed(cause);
+        }
+
+        pending.clear();
+    }
+
+    private ClientReceiver selectRandomReceiver() {
+        ArrayList<ClientReceiver> candidates = session.getProtonSession().receivers().stream()
+                .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+                               r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+                .map((r) -> (ClientReceiver) r.getLinkedResource())
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        if (srand == null) {
+            srand = new SecureRandom();
+        }
+
+        Collections.shuffle(candidates, srand);
+
+        return candidates.isEmpty() ? null : candidates.get(0);
+    }
+
+    private ClientReceiver selectNextAvailable() {
+        final ClientReceiver lastReceiver = session.getProtonSession().getAttachments().get(LAST_RETURNED_STATE_KEY);
+
+        ClientReceiver result = null;
+
+        if (lastReceiver != null && !lastReceiver.protonReceiver.isLocallyClosedOrDetached()) {
+            boolean foundLast = false;
+            for (org.apache.qpid.protonj2.engine.Receiver protonReceover : session.getProtonSession().receivers()) {
+                if (protonReceover.getLinkedResource() instanceof ClientReceiver) {
+                    if (foundLast) {
+                        ClientReceiver candidate = protonReceover.getLinkedResource();
+                        if (candidate.queuedDeliveries() > 0) {
+                            result = candidate;
+                        }
+                    } else {
+                        foundLast = protonReceover.getLinkedResource() == lastReceiver;
+                    }
+                }
+            }
+        } else {
+            session.getProtonSession().getAttachments().set(LAST_RETURNED_STATE_KEY, null);
+        }
+
+        return result != null ? result : selectFirstAvailable();
+    }
+
+    private ClientReceiver selectFirstAvailable() {
+        return session.getProtonSession().receivers().stream()
+                .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+                               r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+                .map((r) -> (ClientReceiver) r.getLinkedResource())
+                .findFirst()
+                .orElse(null);
+    }
+
+    private ClientReceiver selectLargestBacklog() {
+        return session.getProtonSession().receivers().stream()
+                .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+                               r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+                .map((r) -> (ClientReceiver) r.getLinkedResource())
+                .max(Comparator.comparingLong(ClientReceiver::queuedDeliveries))
+                .orElse(null);
+    }
+
+    private ClientReceiver selectSmallestBacklog() {
+        return session.getProtonSession().receivers().stream()
+                .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+                               r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+                .map((r) -> (ClientReceiver) r.getLinkedResource())
+                .min(Comparator.comparingLong(ClientReceiver::queuedDeliveries))
+                .orElse(null);
+    }
+
+    private void deliveryReadHandler(IncomingDelivery delivery) {
+        // When a new delivery arrives that is completed
+        if (!pending.isEmpty() && !delivery.isPartial() && !delivery.isAborted()) {
+            // We only handle next receiver events for normal client receivers and
+            // not for stream receiver types etc.
+            if (delivery.getLink().getLinkedResource() instanceof ClientReceiver) {
+                ClientReceiver receiver = delivery.getLink().getLinkedResource();
+
+                // Track last returned to update state for Round Robin next receiver dispatch
+                delivery.getLink().getSession().getAttachments().set(LAST_RETURNED_STATE_KEY, receiver);
+
+                pending.poll().complete(receiver);
+            }
+        }
+    }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index 91c7270a..1f7cafd4 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -42,7 +42,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
     private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private final ReceiverOptions options;
-    private final FifoDeliveryQueue messageQueue;
+    private final FifoDeliveryQueue deliveryQueue;
 
     ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
         super(session, receiverId, options, receiver);
@@ -53,8 +53,8 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
             protonReceiver.addCredit(options.creditWindow());
         }
 
-        messageQueue = new FifoDeliveryQueue(options.creditWindow());
-        messageQueue.start();
+        deliveryQueue = new FifoDeliveryQueue(options.creditWindow());
+        deliveryQueue.start();
     }
 
     @Override
@@ -67,7 +67,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
         checkClosedOrFailed();
 
         try {
-            ClientDelivery delivery = messageQueue.dequeue(units.toMillis(timeout));
+            ClientDelivery delivery = deliveryQueue.dequeue(units.toMillis(timeout));
             if (delivery != null) {
                 if (options.autoAccept()) {
                     disposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
@@ -91,7 +91,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
     public Delivery tryReceive() throws ClientException {
         checkClosedOrFailed();
 
-        Delivery delivery = messageQueue.dequeueNoWait();
+        Delivery delivery = deliveryQueue.dequeueNoWait();
         if (delivery != null) {
             if (options.autoAccept()) {
                 delivery.disposition(org.apache.qpid.protonj2.client.DeliveryState.accepted(), options.autoSettle());
@@ -107,7 +107,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
 
     @Override
     public long queuedDeliveries() {
-        return messageQueue.size();
+        return deliveryQueue.size();
     }
 
     @Override
@@ -182,8 +182,8 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
         }
 
         if (!delivery.isPartial()) {
-            LOG.trace("Receiver {} has incoming Message(s).", linkId);
-            messageQueue.enqueue(new ClientDelivery(this, delivery));
+            LOG.trace("{} has incoming Message(s).", this);
+            deliveryQueue.enqueue(new ClientDelivery(this, delivery));
         } else {
             delivery.claimAvailableBytes();
         }
@@ -197,7 +197,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
         if (creditWindow > 0) {
             int currentCredit = protonReceiver.getCredit();
             if (currentCredit <= creditWindow * 0.5) {
-                int potentialPrefetch = currentCredit + messageQueue.size();
+                int potentialPrefetch = currentCredit + deliveryQueue.size();
 
                 if (potentialPrefetch <= creditWindow * 0.7) {
                     int additionalCredit = creditWindow - potentialPrefetch;
@@ -213,11 +213,17 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
         }
     }
 
+    @Override
+    protected void linkSpecificLocalCloseHandler() {
+        deliveryQueue.stop();  // Ensure blocked receivers are all unblocked.
+        deliveryQueue.clear();
+    }
+
     @Override
     protected void recreateLinkForReconnect() {
-        int previousCredit = protonReceiver.getCredit() + messageQueue.size();
+        int previousCredit = protonReceiver.getCredit() + deliveryQueue.size();
 
-        messageQueue.clear();  // Prefetched messages should be discarded.
+        deliveryQueue.clear();  // Prefetched messages should be discarded.
 
         if (drainingFuture != null) {
             drainingFuture.complete(this);
@@ -234,10 +240,4 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
         protonReceiver.setLinkedResource(this);
         protonReceiver.addCredit(previousCredit);
     }
-
-    @Override
-    protected void linkSpecificLocalCloseHandler() {
-        messageQueue.stop();  // Ensure blocked receivers are all unblocked.
-        messageQueue.clear();
-    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
index 76df24e5..284a78a5 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
@@ -48,6 +48,7 @@ public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverT
         super(session, linkId, options);
 
         this.protonReceiver = protonReceiver;
+        this.protonReceiver.setLinkedResource(self());
     }
 
     @Override
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
index 9455fcb7..c38beab6 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.function.Supplier;
 
 import org.apache.qpid.protonj2.client.ErrorCondition;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
 import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.Sender;
@@ -52,7 +54,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ClientSession implements Session {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ClientSession.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private static final long INFINITE = -1;
 
@@ -70,6 +72,7 @@ public class ClientSession implements Session {
     private final ClientSenderBuilder senderBuilder;
     private final ClientReceiverBuilder receiverBuilder;
 
+    private ClientNextReceiverSelector nextReceiverSelector;
     private volatile int closed;
     private volatile ClientException failureCause;
     private ClientTransactionContext txnContext = NO_OP_TXN_CONTEXT;
@@ -351,6 +354,38 @@ public class ClientSession implements Session {
         return connection.request(this, rollbackFuture);
     }
 
+    @Override
+    public Receiver nextReceiver() throws ClientException {
+        return nextReceiver(options.defaultNextReceiverPolicy(), -1, TimeUnit.MICROSECONDS);
+    }
+
+    @Override
+    public Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException {
+        return nextReceiver(options.defaultNextReceiverPolicy(), timeout, unit);
+    }
+
+    @Override
+    public Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException {
+        return nextReceiver(policy, -1, TimeUnit.MICROSECONDS);
+    }
+
+    @Override
+    public Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException {
+        checkClosedOrFailed();
+        final ClientFuture<Receiver> nextPending = getFutureFactory().createFuture();
+
+        serializer.execute(() -> {
+            try {
+                checkClosedOrFailed();
+                getNextReceiverSelector().nextReceiver(nextPending, policy, unit.toMillis(timeout));
+            } catch (Throwable error) {
+                nextPending.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
+            }
+        });
+
+        return connection.request(this, nextPending);
+    }
+
     //----- Internal resource open APIs expected to be called from the connection event loop
 
     ClientReceiver internalOpenReceiver(String address, ReceiverOptions receiverOptions) throws ClientException {
@@ -488,6 +523,14 @@ public class ClientSession implements Session {
         }
     }
 
+    private ClientNextReceiverSelector getNextReceiverSelector() {
+        if (nextReceiverSelector == null) {
+            nextReceiverSelector = new ClientNextReceiverSelector(this);
+        }
+
+        return nextReceiverSelector;
+    }
+
     //----- Handle Events from the Proton Session
 
     private void handleLocalOpen(org.apache.qpid.protonj2.engine.Session session) {
@@ -550,6 +593,10 @@ public class ClientSession implements Session {
             protonSession.close();
             protonSession = configureSession(ClientSessionBuilder.recreateSession(connection, protonSession, options));
 
+            if (nextReceiverSelector != null) {
+                nextReceiverSelector.handleReconnect();
+            }
+
             open();
         } else {
             final Connection connection = engine.connection();
@@ -580,6 +627,10 @@ public class ClientSession implements Session {
         } catch (Exception ignore) {
         }
 
+        if (nextReceiverSelector != null) {
+            nextReceiverSelector.handleShutdown();
+        }
+
         if (failureCause != null) {
             openFuture.failed(failureCause);
         } else {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java
index 23f3a3d1..c212cb7b 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java
@@ -76,6 +76,7 @@ final class ClientSessionBuilder {
                     sessionOptions.requestTimeout(connectionOptions.requestTimeout());
                     sessionOptions.sendTimeout(connectionOptions.sendTimeout());
                     sessionOptions.drainTimeout(connectionOptions.drainTimeout());
+                    sessionOptions.defaultNextReceiverPolicy(connectionOptions.defaultNextReceiverPolicy());
                 }
 
                 defaultSessionOptions = sessionOptions;
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
index e13863d8..5921f90b 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
@@ -29,7 +29,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.ClientOptions;
@@ -37,6 +39,7 @@ import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
 import org.apache.qpid.protonj2.client.ErrorCondition;
 import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
 import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.Sender;
@@ -48,11 +51,13 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIOException;
 import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
 import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.SourceMatcher;
+import org.apache.qpid.protonj2.types.messaging.AmqpValue;
 import org.apache.qpid.protonj2.types.transport.AMQPHeader;
 import org.apache.qpid.protonj2.types.transport.AmqpError;
 import org.apache.qpid.protonj2.types.transport.ConnectionError;
@@ -1606,6 +1611,139 @@ public class ConnectionTest extends ImperativeClientTestCase {
         }
     }
 
+    @Test
+    public void testUserSpeicifedNextReceiverPolicyOverridesConfiguration() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.SMALLEST_BACKLOG);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = connection.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = connection.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            Receiver next = connection.nextReceiver(NextReceiverPolicy.LARGEST_BACKLOG);
+            assertSame(next, receiver2);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterConnectionClosedRandom() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.RANDOM);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterConnectionClosedLargestBacklog() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.LARGEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterConnectionClosedSmallestBacklog() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.SMALLEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterConnectionClosedFirstAvailable() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.FIRST_AVAILABLE);
+    }
+
+    public void doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy policy) throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            final CountDownLatch started = new CountDownLatch(1);
+            final CountDownLatch done = new CountDownLatch(1);
+            final AtomicReference<Exception> error = new AtomicReference<>();
+
+            final Client container = Client.create();
+            final ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+            ForkJoinPool.commonPool().execute(() -> {
+                try {
+                    started.countDown();
+                    connection.nextReceiver();
+                } catch (ClientException e) {
+                    error.set(e);
+                } finally {
+                    done.countDown();
+                }
+            });
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            assertTrue(started.await(10, TimeUnit.SECONDS));
+
+            connection.closeAsync();
+
+            assertTrue(done.await(10, TimeUnit.SECONDS));
+            assertTrue(error.get() instanceof ClientConnectionRemotelyClosedException);
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
     @Disabled("Disabled due to requirement of hard coded port")
     @Test
     public void testLocalPortOption() throws Exception {
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 8f88e35b..f18a46db 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2649,124 +2649,120 @@ public class ReceiverTest extends ImperativeClientTestCase {
        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
 
        try (ProtonTestServer peer = new ProtonTestServer()) {
-          peer.expectSASLAnonymousConnect();
-          peer.expectOpen().respond();
-          peer.expectBegin().respond();
-          peer.expectAttach().ofReceiver().respond();
-          peer.expectFlow().withLinkCredit(10);
-          for (int i = 0; i < 10; ++i) {
-             peer.remoteTransfer().withDeliveryId(i)
-                                  .withMore(false)
-                                  .withMessageFormat(0)
-                                  .withPayload(payload).queue();
-          }
-          peer.start();
-
-          URI remoteURI = peer.getServerURI();
-
-          LOG.info("Test started, peer listening on: {}", remoteURI);
-
-          Client container = Client.create();
-          Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
-
-          ReceiverOptions options = new ReceiverOptions();
-          options.autoAccept(autoAccept);
-          options.creditWindow(10);
-
-          Receiver receiver = connection.openReceiver("test-receiver", options);
-
-          Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
-
-          peer.waitForScriptToComplete();
-          if (autoAccept)
-          {
-              peer.expectDisposition().withFirst(0);
-              peer.expectDisposition().withFirst(1);
-          }
-
-          // Consume messages 1 and 2 which should not provoke credit replenishment
-          // as there are still 8 outstanding which is above the 70% mark
-          assertNotNull(receiver.receive()); // #1
-          assertNotNull(receiver.receive()); // #2
-
-          peer.waitForScriptToComplete();
-          peer.expectAttach().ofSender().respond();
-          peer.expectDetach().respond();
-          if (autoAccept)
-          {
-              peer.expectDisposition().withFirst(2);
-          }
-          peer.expectFlow().withLinkCredit(3);
-
-          // Ensure that no additional frames from last receive overlap with this one
-          connection.openSender("test").openFuture().get().close();
-
-          // Now consume message 3 which will trip the replenish barrier and the
-          // credit should be updated to reflect that we still have 7 queued
-          assertNotNull(receiver.receive());  // #3
-
-          peer.waitForScriptToComplete();
-          if (autoAccept)
-          {
-              peer.expectDisposition().withFirst(3);
-              peer.expectDisposition().withFirst(4);
-          }
-
-          // Consume messages 4 and 5 which should not provoke credit replenishment
-          // as there are still 5 outstanding plus the credit we sent last time
-          // which is above the 70% mark
-          assertNotNull(receiver.receive()); // #4
-          assertNotNull(receiver.receive()); // #5
-
-          peer.waitForScriptToComplete();
-          peer.expectAttach().ofSender().respond();
-          peer.expectDetach().respond();
-          if (autoAccept)
-          {
-              peer.expectDisposition().withFirst(5);
-          }
-          peer.expectFlow().withLinkCredit(6);
-
-          // Ensure that no additional frames from last receive overlap with this one
-          connection.openSender("test").openFuture().get().close();
-
-          // Consume number 6 which means we only have 4 outstanding plus the three
-          // that we sent last time we flowed which is 70% of possible prefetch so
-          // we should flow to top off credit which would be 6 since we have four
-          // still pending
-          assertNotNull(receiver.receive()); // #6
-
-          peer.waitForScriptToComplete();
-          if (autoAccept)
-          {
-              peer.expectDisposition().withFirst(6);
-              peer.expectDisposition().withFirst(7);
-          }
-
-          // Consume deliveries 7 and 8 which should not flow as we should be
-          // above the threshold of 70% since we would now have 2 outstanding
-          // and 6 credits on the link
-          assertNotNull(receiver.receive()); // #7
-          assertNotNull(receiver.receive()); // #8
-
-          peer.waitForScriptToComplete();
-          if (autoAccept)
-          {
-              peer.expectDisposition().withFirst(8);
-              peer.expectDisposition().withFirst(9);
-          }
-
-          // Now consume 9 and 10 but we still shouldn't flow more credit because
-          // the link credit is above the 50% mark for overall credit windowing.
-          assertNotNull(receiver.receive()); // #9
-          assertNotNull(receiver.receive()); // #10
-
-          peer.waitForScriptToComplete();
-          peer.expectClose().respond();
-
-          connection.close();
-
-          peer.waitForScriptToComplete();
-       }
+           peer.expectSASLAnonymousConnect();
+           peer.expectOpen().respond();
+           peer.expectBegin().respond();
+           peer.expectAttach().ofReceiver().respond();
+           peer.expectFlow().withLinkCredit(10);
+           for (int i = 0; i < 10; ++i) {
+               peer.remoteTransfer().withDeliveryId(i)
+                                    .withMore(false)
+                                    .withMessageFormat(0)
+                                    .withPayload(payload).queue();
+           }
+           peer.start();
+
+           URI remoteURI = peer.getServerURI();
+
+           LOG.info("Test started, peer listening on: {}", remoteURI);
+
+           Client container = Client.create();
+           Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+
+           ReceiverOptions options = new ReceiverOptions();
+           options.autoAccept(autoAccept);
+           options.creditWindow(10);
+
+           Receiver receiver = connection.openReceiver("test-receiver", options);
+
+           Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
+
+           peer.waitForScriptToComplete();
+           if (autoAccept)
+           {
+               peer.expectDisposition().withFirst(0);
+               peer.expectDisposition().withFirst(1);
+           }
+
+           // Consume messages 1 and 2 which should not provoke credit replenishment
+           // as there are still 8 outstanding which is above the 70% mark
+           assertNotNull(receiver.receive()); // #1
+           assertNotNull(receiver.receive()); // #2
+
+           peer.waitForScriptToComplete();
+           peer.expectAttach().ofSender().respond();
+           peer.expectDetach().respond();
+           if (autoAccept)
+           {
+               peer.expectDisposition().withFirst(2);
+           }
+           peer.expectFlow().withLinkCredit(3);
+
+           // Ensure that no additional frames from last receive overlap with this one
+           connection.openSender("test").openFuture().get().close();
+
+           // Now consume message 3 which will trip the replenish barrier and the
+           // credit should be updated to reflect that we still have 7 queued
+           assertNotNull(receiver.receive());  // #3
+
+           peer.waitForScriptToComplete();
+           if (autoAccept) {
+               peer.expectDisposition().withFirst(3);
+               peer.expectDisposition().withFirst(4);
+           }
+
+           // Consume messages 4 and 5 which should not provoke credit replenishment
+           // as there are still 5 outstanding plus the credit we sent last time
+           // which is above the 70% mark
+           assertNotNull(receiver.receive()); // #4
+           assertNotNull(receiver.receive()); // #5
+
+           peer.waitForScriptToComplete();
+           peer.expectAttach().ofSender().respond();
+           peer.expectDetach().respond();
+           if (autoAccept) {
+               peer.expectDisposition().withFirst(5);
+           }
+           peer.expectFlow().withLinkCredit(6);
+
+           // Ensure that no additional frames from last receive overlap with this one
+           connection.openSender("test").openFuture().get().close();
+
+           // Consume number 6 which means we only have 4 outstanding plus the three
+           // that we sent last time we flowed which is 70% of possible prefetch so
+           // we should flow to top off credit which would be 6 since we have four
+           // still pending
+           assertNotNull(receiver.receive()); // #6
+
+           peer.waitForScriptToComplete();
+           if (autoAccept) {
+               peer.expectDisposition().withFirst(6);
+               peer.expectDisposition().withFirst(7);
+           }
+
+           // Consume deliveries 7 and 8 which should not flow as we should be
+           // above the threshold of 70% since we would now have 2 outstanding
+           // and 6 credits on the link
+           assertNotNull(receiver.receive()); // #7
+           assertNotNull(receiver.receive()); // #8
+
+           peer.waitForScriptToComplete();
+           if (autoAccept) {
+               peer.expectDisposition().withFirst(8);
+               peer.expectDisposition().withFirst(9);
+           }
+
+           // Now consume 9 and 10 but we still shouldn't flow more credit because
+           // the link credit is above the 50% mark for overall credit windowing.
+           assertNotNull(receiver.receive()); // #9
+           assertNotNull(receiver.receive()); // #10
+
+           peer.waitForScriptToComplete();
+           peer.expectClose().respond();
+
+           connection.close();
+
+           peer.waitForScriptToComplete();
+        }
     }
 }
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java
index 8da2b6f6..dfefbf6d 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java
@@ -17,19 +17,27 @@
 package org.apache.qpid.protonj2.client.impl;
 
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Delivery;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
 import org.apache.qpid.protonj2.client.Receiver;
+import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.Session;
 import org.apache.qpid.protonj2.client.SessionOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.types.messaging.AmqpValue;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
@@ -256,4 +264,96 @@ class ReconnectSessionTest extends ImperativeClientTestCase {
             finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesRandom() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RANDOM);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesLargestBacklog() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LARGEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SMALLEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesFirstAvailable() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FIRST_AVAILABLE);
+    }
+
+    public void doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy) throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer firstPeer = new ProtonTestServer();
+             ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+            firstPeer.expectSASLAnonymousConnect();
+            firstPeer.expectOpen().respond();
+            firstPeer.expectBegin().respond();
+            firstPeer.expectAttach().ofReceiver().respond();
+            firstPeer.expectFlow().withLinkCredit(10);
+            firstPeer.dropAfterLastHandler();
+            firstPeer.start();
+
+            finalPeer.expectSASLAnonymousConnect();
+            finalPeer.expectOpen().respond();
+            finalPeer.expectBegin().respond();
+            finalPeer.expectAttach().ofReceiver().respond();
+            finalPeer.expectFlow().withLinkCredit(10);
+            finalPeer.start();
+
+            final URI firstURI = firstPeer.getServerURI();
+            final URI finalURI = finalPeer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", firstURI);
+
+            final CountDownLatch done = new CountDownLatch(1);
+
+            ConnectionOptions options = new ConnectionOptions();
+            options.defaultNextReceiverPolicy(policy);
+            options.reconnectOptions().reconnectEnabled(true);
+            options.reconnectOptions().addReconnectLocation(finalURI.getHost(), finalURI.getPort());
+
+            Client container = Client.create();
+            Connection connection = container.connect(firstURI.getHost(), firstURI.getPort(), options);
+
+            ForkJoinPool.commonPool().execute(() -> {
+                try {
+                    Receiver receiver = connection.nextReceiver();
+                    Delivery delivery = receiver.receive();
+                    LOG.info("Next receiver returned delivery with body: {}", delivery.message().body());
+                    done.countDown();
+                } catch (ClientException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+
+            firstPeer.waitForScriptToComplete();
+            finalPeer.waitForScriptToComplete();
+
+            finalPeer.remoteTransfer().withHandle(0)
+                                      .withDeliveryId(0)
+                                      .withMore(false)
+                                      .withMessageFormat(0)
+                                      .withPayload(payload).later(15);
+
+            finalPeer.waitForScriptToComplete();
+
+            assertTrue(done.await(10, TimeUnit.SECONDS));
+
+            finalPeer.waitForScriptToComplete();
+            finalPeer.expectClose().respond();
+
+            connection.close();
+
+            finalPeer.waitForScriptToComplete();
+        }
+    }
 }
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
index c39a0e4b..475fa56b 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
@@ -18,6 +18,9 @@ package org.apache.qpid.protonj2.client.impl;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -25,13 +28,19 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.protonj2.client.Client;
 import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Delivery;
 import org.apache.qpid.protonj2.client.ErrorCondition;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
+import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.SenderOptions;
 import org.apache.qpid.protonj2.client.Session;
@@ -40,7 +49,9 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIOException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.types.messaging.AmqpValue;
 import org.apache.qpid.protonj2.types.transport.AmqpError;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -589,4 +600,983 @@ public class SessionTest extends ImperativeClientTestCase {
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
         }
     }
+
+    @Test
+    public void testNextReceiverFromDefaultSessionReturnsSameReceiverForQueuedDeliveries() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            for (int i = 0; i < 10; ++i) {
+                peer.remoteTransfer().withDeliveryId(i)
+                                     .withMore(false)
+                                     .withMessageFormat(0)
+                                     .withPayload(payload).queue();
+            }
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions connOptions = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.FIRST_AVAILABLE);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connOptions);
+
+            ReceiverOptions options = new ReceiverOptions().creditWindow(0).autoAccept(false);
+            Receiver receiver = connection.openReceiver("test-receiver", options);
+            receiver.addCredit(10);
+
+            Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
+
+            peer.waitForScriptToComplete();
+
+            for (int i = 0; i < 10; ++i) {
+                 Receiver nextReceiver = connection.nextReceiver();
+                 assertSame(receiver, nextReceiver);
+                 Delivery delivery = nextReceiver.receive();
+                 assertNotNull(delivery);
+            }
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverTimesOut() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+
+            connection.openReceiver("test-receiver").openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            assertNull(connection.nextReceiver(10, TimeUnit.MILLISECONDS));
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverReturnsAllReceiversEventually() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+
+            ReceiverOptions options = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            connection.openReceiver("test-receiver1", options).openFuture().get();
+            connection.openReceiver("test-receiver2", options).openFuture().get();
+            connection.openReceiver("test-receiver3", options).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Receiver receiver1 = connection.nextReceiver(NextReceiverPolicy.FIRST_AVAILABLE);
+            assertNotNull(receiver1.receive());
+            Receiver receiver2 = connection.nextReceiver(NextReceiverPolicy.FIRST_AVAILABLE);
+            assertNotNull(receiver2.receive());
+            Receiver receiver3 = connection.nextReceiver(NextReceiverPolicy.FIRST_AVAILABLE);
+            assertNotNull(receiver3.receive());
+
+            assertNotSame(receiver1, receiver2);
+            assertNotSame(receiver1, receiver3);
+            assertNotSame(receiver2, receiver3);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testConnectionOptionsConfiguresLargestBacklogNextReceiverPolicy() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.LARGEST_BACKLOG);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = connection.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = connection.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            Receiver next = connection.nextReceiver();
+            assertSame(next, receiver2);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testSessionOptionsConfiguresLargestBacklogNextReceiverPolicy() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.LARGEST_BACKLOG);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            Receiver next = session.nextReceiver();
+            assertSame(next, receiver2);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testUserSpeicifedNextReceiverPolicyOverridesConfiguration() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.SMALLEST_BACKLOG);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            Receiver next = session.nextReceiver(NextReceiverPolicy.LARGEST_BACKLOG);
+            assertSame(next, receiver2);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testSessionOptionsConfiguresSmallestBacklogNextReceiverPolicy() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(4)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(5)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.SMALLEST_BACKLOG);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 3);
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            Receiver next = session.nextReceiver();
+            assertSame(next, receiver3);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesRoundRobin() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.ROUND_ROBIN);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesRandom() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RANDOM);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesLargestBacklog() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LARGEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SMALLEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverCompletesAfterDeliveryArrivesFirstAvailable() throws Exception {
+        doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FIRST_AVAILABLE);
+    }
+
+    public void doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy) throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            final CountDownLatch done = new CountDownLatch(1);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            ForkJoinPool.commonPool().execute(() -> {
+                try {
+                    Receiver receiver = connection.nextReceiver();
+                    Delivery delivery = receiver.receive();
+                    LOG.info("Next receiver returned delivery with body: {}", delivery.message().body());
+                    done.countDown();
+                } catch (ClientException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).later(15);
+
+            peer.waitForScriptToComplete();
+
+            assertTrue(done.await(10, TimeUnit.SECONDS));
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterSessionClosedRoundRobin() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.ROUND_ROBIN);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterSessionClosedRandom() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.RANDOM);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterSessionClosedLargestBacklog() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.LARGEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterSessionClosedSmallestBacklog() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.SMALLEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverThrowsAfterSessionClosedFirstAvailable() throws Exception {
+        doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.FIRST_AVAILABLE);
+    }
+
+    public void doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy policy) throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            final CountDownLatch started = new CountDownLatch(1);
+            final CountDownLatch done = new CountDownLatch(1);
+            final AtomicReference<Exception> error = new AtomicReference<>();
+
+            final Client container = Client.create();
+            final ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+            final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            final Session session = connection.openSession().openFuture().get();
+
+            ForkJoinPool.commonPool().execute(() -> {
+                try {
+                    started.countDown();
+                    session.nextReceiver();
+                } catch (ClientException e) {
+                    error.set(e);
+                } finally {
+                    done.countDown();
+                }
+            });
+
+            peer.waitForScriptToComplete();
+
+            assertTrue(started.await(10, TimeUnit.SECONDS));
+
+            peer.expectEnd().respond();
+
+            session.closeAsync();
+
+            assertTrue(done.await(10, TimeUnit.SECONDS));
+            assertTrue(error.get() instanceof ClientIllegalStateException);
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateRoundRobin() throws Exception {
+        doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.ROUND_ROBIN);
+    }
+
+    @Test
+    public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateRandom() throws Exception {
+        doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.RANDOM);
+    }
+
+    @Test
+    public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateLargestBacklog() throws Exception {
+        doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.LARGEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateSmallestBacklog() throws Exception {
+        doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.SMALLEST_BACKLOG);
+    }
+
+    @Test
+    public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateFirstAvailable() throws Exception {
+        doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.FIRST_AVAILABLE);
+    }
+
+    public void doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy policy) throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            final CountDownLatch started = new CountDownLatch(1);
+            final CountDownLatch done = new CountDownLatch(1);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+            ForkJoinPool.commonPool().execute(() -> {
+                try {
+                    started.countDown();
+                    Receiver receiver = connection.nextReceiver();
+                    Delivery delivery = receiver.receive();
+                    LOG.info("Next receiver returned delivery with body: {}", delivery.message().body());
+                    done.countDown();
+                } catch (ClientException e) {
+                    e.printStackTrace();
+                }
+            });
+
+            assertTrue(started.await(10, TimeUnit.SECONDS));
+
+            connection.openFuture().get();
+
+            peer.waitForScriptToComplete();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue().afterDelay(10);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            assertTrue(done.await(10, TimeUnit.SECONDS));
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverRoundRobinReturnsNextReceiverAfterLast() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            assertEquals(0, receiver1.queuedDeliveries());
+
+            Receiver next = session.nextReceiver();
+            assertSame(next, receiver2);
+            next = session.nextReceiver();
+            assertSame(next, receiver3);
+
+            peer.waitForScriptToComplete();
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverRoundRobinPolicyWrapsAround() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            assertEquals(0, receiver1.queuedDeliveries());
+
+            Receiver next = session.nextReceiver();
+            assertSame(next, receiver2);
+            next = session.nextReceiver();
+            assertSame(next, receiver3);
+
+            peer.waitForScriptToComplete();
+
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).now();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+
+            next = session.nextReceiver();
+            assertSame(next, receiver1);
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverRoundRobinPolicyRestartsWhenLastReceiverClosed() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(1)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(2)
+                                 .withDeliveryId(2)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+            peer.expectDetach().respond();
+
+            Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+            Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+            assertEquals(0, receiver1.queuedDeliveries());
+
+            Receiver next = session.nextReceiver();
+            assertSame(next, receiver2);
+            next.close();
+
+            peer.waitForScriptToComplete();
+
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(3)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).now();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+
+            next = session.nextReceiver();
+            assertSame(next, receiver1);
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
+
+    @Test
+    public void testNextReceiverRoundRobinPolicySkipsEmptyReceivers() throws Exception {
+        byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectSASLAnonymousConnect();
+            peer.expectOpen().respond();
+            peer.expectBegin().respond();
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.expectAttach().ofReceiver().respond();
+            peer.expectFlow().withLinkCredit(10);
+            peer.remoteTransfer().withHandle(0)
+                                 .withDeliveryId(0)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.remoteTransfer().withHandle(3)
+                                 .withDeliveryId(1)
+                                 .withMore(false)
+                                 .withMessageFormat(0)
+                                 .withPayload(payload).queue();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            Client container = Client.create();
+            ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+            SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+            Session session = connection.openSession(sessionOptions);
+
+            ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+            Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+            Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+            Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+            Receiver receiver4 = session.openReceiver("test-receiver4", receiverOptions).openFuture().get();
+
+            peer.waitForScriptToComplete();
+
+            Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+            Wait.waitFor(() -> receiver4.queuedDeliveries() == 1);
+
+            assertEquals(0, receiver2.queuedDeliveries());
+            assertEquals(0, receiver3.queuedDeliveries());
+
+            Receiver next = session.nextReceiver();
+            assertSame(next, receiver1);
+            next = session.nextReceiver();
+            assertSame(next, receiver4);
+
+            peer.waitForScriptToComplete();
+            peer.expectClose().respond();
+
+            connection.close();
+
+            peer.waitForScriptToComplete();
+        }
+    }
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java
index 574af9ca..2b115e35 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java
@@ -221,8 +221,25 @@ public interface Session extends Endpoint<Session> {
      * @param remoteTxnManagerOpenEventHandler
      *          the EventHandler that will be signaled when a {@link TransactionController} link is remotely opened.
      *
-     * @return this session for chaining
+     * @return this {@linkplain Session} for chaining
      */
     Session transactionManagerOpenHandler(EventHandler<TransactionManager> remoteTxnManagerOpenEventHandler);
 
+    //----- Session monitoring handlers
+
+    /**
+     * Allows monitoring of incoming deliveries to receivers attached to this {@link Session}. The
+     * {@link Receiver} that is the target of the incoming delivery will be notified first of the
+     * incoming delivery and any processing should be done using the {@link Receiver#deliveryReadHandler(EventHandler)}.
+     * This event point will be trigger only after the {@link Receiver} level handler and should be
+     * used to monitor deliveries passing through a session for logging or other state related actions
+     * performed by the service managing this session.
+     *
+     * @param delivery
+     * 		The delivery that was read which contains
+     *
+     * @return this {@linkplain Session} for chaining
+     */
+    Session deliveryReadHandler(EventHandler<IncomingDelivery> delivery);
+
 }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
index 16fa318a..3262b9ef 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
@@ -19,9 +19,8 @@ package org.apache.qpid.protonj2.engine.impl;
 import java.lang.ref.SoftReference;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -69,8 +68,8 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn
     private Open remoteOpen;
     private AMQPHeader remoteHeader;
 
-    private Map<Integer, ProtonSession> localSessions = new HashMap<>();
-    private Map<Integer, ProtonSession> remoteSessions = new HashMap<>();
+    private Map<Integer, ProtonSession> localSessions = new LinkedHashMap<>();
+    private Map<Integer, ProtonSession> remoteSessions = new LinkedHashMap<>();
 
     // These would be sessions that were begun and ended before the remote ever
     // responded with a matching being and end.  The remote is required to complete
@@ -707,7 +706,7 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn
         if (localSessions.isEmpty() && remoteSessions.isEmpty()) {
             result = Collections.EMPTY_SET;
         } else {
-            result = new HashSet<>(localSessions.size());
+            result = new LinkedHashSet<>(localSessions.size());
             result.addAll(localSessions.values());
             result.addAll(remoteSessions.values());
         }
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
index 954a29c1..6b74d14a 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
@@ -260,6 +260,12 @@ public class ProtonReceiver extends ProtonLink<Receiver> implements Receiver {
             deliveryReadEventHandler.handle(delivery);
         }
 
+        // Allow session owner to monitor deliveries passing through the session
+        // but only after the receiver handlers have had a chance to handle it.
+        if (session.deliveryReadHandler() != null) {
+            session.deliveryReadHandler().handle(delivery);
+        }
+
         return this;
     }
 
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java
index ed72baca..a0b536fb 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java
@@ -18,15 +18,16 @@ package org.apache.qpid.protonj2.engine.impl;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.engine.ConnectionState;
 import org.apache.qpid.protonj2.engine.EventHandler;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.engine.Link;
 import org.apache.qpid.protonj2.engine.LinkState;
 import org.apache.qpid.protonj2.engine.Receiver;
@@ -65,8 +66,8 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
     private final ProtonSessionOutgoingWindow outgoingWindow;
     private final ProtonSessionIncomingWindow incomingWindow;
 
-    private final Map<String, ProtonSender> senderByNameMap = new HashMap<>();
-    private final Map<String, ProtonReceiver> receiverByNameMap = new HashMap<>();
+    private final Map<String, ProtonSender> senderByNameMap = new LinkedHashMap<>();
+    private final Map<String, ProtonReceiver> receiverByNameMap = new LinkedHashMap<>();
 
     private final SplayMap<ProtonLink<?>> localLinks = new SplayMap<>();
     private final SplayMap<ProtonLink<?>> remoteLinks = new SplayMap<>();
@@ -85,6 +86,7 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
     private EventHandler<Sender> remoteSenderOpenEventHandler;
     private EventHandler<Receiver> remoteReceiverOpenEventHandler;
     private EventHandler<TransactionManager> remoteTxnManagerOpenEventHandler;
+    private EventHandler<IncomingDelivery> deliveryReadHandler;
 
     /**
      * Creates a new {@link ProtonSession} instance bound to the given {@link ProtonConnection}.
@@ -314,7 +316,7 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
         if (senderByNameMap.isEmpty()) {
             result = Collections.EMPTY_SET;
         } else {
-            result = new HashSet<>(senderByNameMap.values());
+            result = new LinkedHashSet<>(senderByNameMap.values());
         }
 
         return result;
@@ -328,7 +330,7 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
         if (receiverByNameMap.isEmpty()) {
             result = Collections.EMPTY_SET;
         } else {
-            result = new HashSet<>(receiverByNameMap.values());
+            result = new LinkedHashSet<>(receiverByNameMap.values());
         }
 
         return result;
@@ -449,6 +451,16 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
         return remoteTxnManagerOpenEventHandler;
     }
 
+    @Override
+    public ProtonSession deliveryReadHandler(EventHandler<IncomingDelivery> deliveryReadHandler) {
+        this.deliveryReadHandler = deliveryReadHandler;
+        return this;
+    }
+
+    EventHandler<IncomingDelivery> deliveryReadHandler() {
+        return deliveryReadHandler;
+    }
+
     //----- Respond to Connection and Engine state changes
 
     void handleConnectionLocallyClosed(ProtonConnection protonConnection) {
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
index 1cb5c3e2..9e5719c0 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
@@ -2588,4 +2588,41 @@ public class ProtonSessionTest extends ProtonEngineTestSupport {
         assertNotNull(failure);
         assertTrue(failure instanceof ProtocolViolationException);
     }
+
+    @Test
+    public void testSessionWideDeliveryMonitoringHandler() throws Exception {
+        Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        final AtomicBoolean deliveryReadByReceiver = new AtomicBoolean();
+        final AtomicBoolean deliveryReadBySession = new AtomicBoolean();
+
+        peer.expectAMQPHeader().respondWithAMQPHeader();
+        peer.expectOpen().respond().withContainerId("driver");
+        peer.expectBegin().respond();
+        peer.expectAttach().ofReceiver().respond();
+        peer.expectFlow().withLinkCredit(1);
+        peer.remoteTransfer().withHandle(0)
+                             .withDeliveryId(0)
+                             .withDeliveryTag(new byte[] {1})
+                             .onChannel(0)
+                             .queue();
+
+        Connection connection = engine.start().open();
+        Session session = connection.session().open();
+
+        session.deliveryReadHandler((delivery) -> deliveryReadBySession.set(true));
+
+        Receiver receiver = session.receiver("test");
+        receiver.deliveryReadHandler((delivery) -> deliveryReadByReceiver.set(true));
+        receiver.open().addCredit(1);
+
+        peer.waitForScriptToComplete();
+
+        assertTrue(deliveryReadByReceiver.get());
+        assertTrue(deliveryReadBySession.get());
+
+        assertNull(failure);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org