You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/18 21:10:31 UTC
[qpid-protonj2] branch main updated: PROTON-2547 Add nextReceiver API to session and connection
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 62e0584b PROTON-2547 Add nextReceiver API to session and connection
62e0584b is described below
commit 62e0584b6697de1d57f06f07eb2c4c94e89d634a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 18 16:43:22 2022 -0400
PROTON-2547 Add nextReceiver API to session and connection
Allow a user to poll a session or the base connection's default session
for the next receiver that has a delivery or wait for a delivery to
arrive for any of the session receivers.
---
.../protonj2/client/examples/NextReceiver.java | 68 ++
.../apache/qpid/protonj2/client/Connection.java | 66 ++
.../qpid/protonj2/client/ConnectionOptions.java | 24 +
.../qpid/protonj2/client/NextReceiverPolicy.java | 78 ++
.../org/apache/qpid/protonj2/client/Session.java | 67 ++
.../qpid/protonj2/client/SessionOptions.java | 23 +
.../protonj2/client/impl/ClientConnection.java | 25 +
.../qpid/protonj2/client/impl/ClientLinkType.java | 8 +-
.../client/impl/ClientNextReceiverSelector.java | 207 +++++
.../qpid/protonj2/client/impl/ClientReceiver.java | 34 +-
.../client/impl/ClientReceiverLinkType.java | 1 +
.../qpid/protonj2/client/impl/ClientSession.java | 53 +-
.../protonj2/client/impl/ClientSessionBuilder.java | 1 +
.../qpid/protonj2/client/impl/ConnectionTest.java | 138 +++
.../qpid/protonj2/client/impl/ReceiverTest.java | 234 +++--
.../protonj2/client/impl/ReconnectSessionTest.java | 100 +++
.../qpid/protonj2/client/impl/SessionTest.java | 990 +++++++++++++++++++++
.../org/apache/qpid/protonj2/engine/Session.java | 19 +-
.../protonj2/engine/impl/ProtonConnection.java | 9 +-
.../qpid/protonj2/engine/impl/ProtonReceiver.java | 6 +
.../qpid/protonj2/engine/impl/ProtonSession.java | 22 +-
.../protonj2/engine/impl/ProtonSessionTest.java | 37 +
22 files changed, 2059 insertions(+), 151 deletions(-)
diff --git a/protonj2-client-examples/src/main/java/org/apache/qpid/protonj2/client/examples/NextReceiver.java b/protonj2-client-examples/src/main/java/org/apache/qpid/protonj2/client/examples/NextReceiver.java
new file mode 100644
index 00000000..3282802d
--- /dev/null
+++ b/protonj2-client-examples/src/main/java/org/apache/qpid/protonj2/client/examples/NextReceiver.java
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protonj2.client.examples;
+
+import java.util.concurrent.ForkJoinPool;
+
+import org.apache.qpid.protonj2.client.Client;
+import org.apache.qpid.protonj2.client.Connection;
+import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Delivery;
+import org.apache.qpid.protonj2.client.Message;
+
+public class NextReceiver {
+
+ public static void main(String[] args) throws Exception {
+ final String serverHost = System.getProperty("HOST", "localhost");
+ final int serverPort = Integer.getInteger("PORT", 5672);
+ final String address1 = System.getProperty("ADDRESS1", "next-receiver-1-address");
+ final String address2 = System.getProperty("ADDRESS2", "next-receiver-2-address");
+
+ final Client client = Client.create();
+
+ final ConnectionOptions options = new ConnectionOptions();
+ options.user(System.getProperty("USER"));
+ options.password(System.getProperty("PASSWORD"));
+
+ try (Connection connection = client.connect(serverHost, serverPort, options)) {
+
+ connection.openReceiver(address1);
+ connection.openReceiver(address2);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ Thread.sleep(2000);
+ connection.send(Message.create("Hello World 1").to(address1));
+ Thread.sleep(2000);
+ connection.send(Message.create("Hello World 2").to(address2));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ final Delivery delivery1 = connection.nextReceiver().receive();
+ final Delivery delivery2 = connection.nextReceiver().receive();
+
+ System.out.println("Received first message with body: " + delivery1.message().body());
+ System.out.println("Received second message with body: " + delivery2.message().body());
+ }
+ }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java
index c8ffe356..2cac3b4e 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Connection.java
@@ -18,6 +18,7 @@ package org.apache.qpid.protonj2.client;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
@@ -460,6 +461,71 @@ public interface Connection extends AutoCloseable {
*/
Tracker send(Message<?> message) throws ClientException;
+ /**
+ * Waits indefinitely for a receiver created from the connection default session to have a
+ * delivery ready for receipt. The selection of the next receiver when more than one exists
+ * which has pending deliveries is based upon the configured value of the
+ * {@link ConnectionOptions#defaultNextReceiverPolicy()}.
+ *
+ * @return the next receiver that has a pending delivery available based on policy.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver() throws ClientException;
+
+ /**
+ * Waits indefinitely for a receiver created from the connection default session to have a
+ * delivery ready for receipt. The selection of the next receiver when more than one exists
+ * which has pending deliveries is based upon the value of the {@link NextReceiverPolicy}
+ * that is provided by the caller.
+ *
+ * @param policy
+ * The policy to apply when selecting the next receiver.
+ *
+ * @return the next receiver that has a pending delivery available based on policy.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException;
+
+ /**
+ * Waits for the configured time interval for a receiver created from the connection default
+ * session to have a delivery ready for receipt. The selection of the next receiver when more
+ * than one exists which has pending deliveries is based upon the configured value of the
+ * {@link ConnectionOptions#defaultNextReceiverPolicy()}. If no receiver has an incoming delivery
+ * before the given timeout expires the method returns null.
+ *
+ * @param timeout
+ * The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+ * @param unit
+ * The unit of time that the given timeout represents.
+ *
+ * @return the next receiver that has a pending delivery available based on policy or null if the timeout is reached.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException;
+
+ /**
+ * Waits for the configured time interval for a receiver created from the connection default
+ * session to have a delivery ready for receipt. The selection of the next receiver when more
+ * than one exists which has pending deliveries is based upon the {@link NextReceiverPolicy}
+ * provided by the caller. If no receiver has an incoming delivery before the given timeout
+ * expires the method returns null.
+ *
+ * @param policy
+ * The policy to apply when selecting the next receiver.
+ * @param timeout
+ * The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+ * @param unit
+ * The unit of time that the given timeout represents.
+ *
+ * @return the next receiver that has a pending delivery available based on policy or null if the timeout is reached.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException;
+
/**
* Returns the properties that the remote provided upon successfully opening the {@link Connection}. If the
* open has not completed yet this method will block to await the open response which carries the remote
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java
index a08105ff..da4bf222 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ConnectionOptions.java
@@ -50,6 +50,7 @@ public class ConnectionOptions {
public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
public static final int DEFAULT_CHANNEL_MAX = 65535;
public static final int DEFAULT_MAX_FRAME_SIZE = 65536;
+ public static final NextReceiverPolicy DEFAULT_NEXT_RECEIVER_POLICY = NextReceiverPolicy.ROUND_ROBIN;
private long sendTimeout = DEFAULT_SEND_TIMEOUT;
private long requestTimeout = DEFAULT_REQUEST_TIMEOUT;
@@ -72,6 +73,7 @@ public class ConnectionOptions {
private Map<String, Object> properties;
private String virtualHost;
private boolean traceFrames;
+ private NextReceiverPolicy nextReceiverPolicy = DEFAULT_NEXT_RECEIVER_POLICY;
private BiConsumer<Connection, ConnectionEvent> connectedHandler;
private BiConsumer<Connection, DisconnectionEvent> disconnectedHandler;
@@ -127,6 +129,7 @@ public class ConnectionOptions {
other.interruptedHandler(interruptedHandler);
other.reconnectedHandler(reconnectedHandler);
other.disconnectedHandler(disconnectedHandler);
+ other.defaultNextReceiverPolicy(nextReceiverPolicy);
if (offeredCapabilities != null) {
other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
@@ -646,6 +649,27 @@ public class ConnectionOptions {
return this;
}
+ /**
+ * @return the configured default next receiver policy for the connection.
+ */
+ public NextReceiverPolicy defaultNextReceiverPolicy() {
+ return nextReceiverPolicy;
+ }
+
+ /**
+ * Configures the default next receiver policy for this connection and any session
+ * that is created without specifying user defined session default options.
+ *
+ * @param policy
+ * The next receiver policy to assign as the default.
+ *
+ * @return this {@link ConnectionOptions} instance.
+ */
+ public ConnectionOptions defaultNextReceiverPolicy(NextReceiverPolicy policy) {
+ this.nextReceiverPolicy = policy;
+ return this;
+ }
+
/**
* @return the connection failed handler currently registered.
*/
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/NextReceiverPolicy.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/NextReceiverPolicy.java
new file mode 100644
index 00000000..92c232d3
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/NextReceiverPolicy.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client;
+
+/**
+ * Determines the behavior of a Session when the next receiver method is called
+ * on that session. Each policy provides a contract on the ordering of returned
+ * receivers from the next receiver API when there are receivers with locally
+ * queued deliveries. When there are no {@link Receiver} instances that have
+ * locally queued deliveries the next receive API will return the next receiver
+ * to receive a complete incoming delivery unless a timeout was given and that
+ * time period expires in which case it will return <code>null</code>.
+ * <p>
+ * Should the user perform receive calls on a {@link Receiver} directly in multiple
+ * threads the behavior of the next receiver API is undefined and it becomes possible
+ * that the resulting receiver returned from that API will have no actual pending
+ * deliveries due to a race. In most cases the caller can mitigate some risk by using
+ * the {@link Receiver#tryReceive()} API and accounting for a null result.
+ */
+public enum NextReceiverPolicy {
+
+ /**
+ * Examines the list of currently open receivers in the session and returns
+ * the next receiver that has a pending delivery that follows the previously
+ * returned receiver (if any) otherwise the first receiver in the session with
+ * a pending delivery is returned. The order of receivers returned will likely
+ * be creation order however the implementation is not required to follow this
+ * pattern so the caller should not be coded to rely on that ordering.
+ */
+ ROUND_ROBIN,
+
+ /**
+ * Examines the list of currently open receivers in the session and returns a
+ * random selection from the set of receivers that have a pending delivery
+ * immediately available. This provides a means of selecting receivers which
+ * is not prone to sticking to a highly active receiver which can starve out
+ * other receivers which receive only limited traffic.
+ */
+ RANDOM,
+
+ /**
+ * Examines the list of currently open receivers in the session and returns the
+ * first receiver found with an available delivery. This can result in starvation
+ * if that receiver has a continuous feed of new deliveries from the remote as it
+ * will be repeatedly selected by the next receiver API.
+ */
+ FIRST_AVAILABLE,
+
+ /**
+ * Examines the list of currently open receivers in the session and returns the
+ * receiver with the largest backlog of available deliveries. This can result in
+ * starvation if that receiver has a continuous feed of new deliveries from the
+ * remote as it will likely be repeatedly selected by the next receiver API.
+ */
+ LARGEST_BACKLOG,
+
+ /**
+ * Examines the list of currently open receivers in the session and returns the
+ * receiver with the smallest backlog of available deliveries.
+ */
+ SMALLEST_BACKLOG
+
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java
index 18260da4..568b93df 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Session.java
@@ -18,6 +18,7 @@ package org.apache.qpid.protonj2.client;
import java.util.Map;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientTransactionNotActiveException;
@@ -326,4 +327,70 @@ public interface Session extends AutoCloseable {
*/
Session rollbackTransaction() throws ClientException;
+ /**
+ * Waits indefinitely for a receiver created from this session to have a delivery ready for
+ * receipt. The selection of the next receiver when more than one exits which has pending
+ * deliveries is based upon the configured value of the {@link SessionOptions#defaultNextReceiverPolicy()}
+ * used to create this session or if none was provided then the value is taken from the value
+ * of the {@link ConnectionOptions#defaultNextReceiverPolicy()}.
+ *
+ * @return the next receiver that has a pending delivery available based on policy.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver() throws ClientException;
+
+ /**
+ * Waits indefinitely for a receiver created from this session to have a delivery ready for
+ * receipt. The selection of the next receiver when more than one exits which has pending
+ * deliveries is based upon the value of the {@link NextReceiverPolicy} that is provided by
+ * the caller.
+ *
+ * @param policy
+ * The policy to apply when selecting the next receiver.
+ *
+ * @return the next receiver that has a pending delivery available based on policy.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException;
+
+ /**
+ * Waits for the given duration for a receiver created from this session to have a delivery ready
+ * for receipt. The selection of the next receiver when more than one exits which has pending
+ * deliveries is based upon the configured value of the {@link SessionOptions#defaultNextReceiverPolicy()}
+ * used to create this session or if none was provided then the value is taken from the value
+ * of the {@link ConnectionOptions#defaultNextReceiverPolicy()}. If no receiver has an available
+ * delivery within the given timeout this method returns null.
+ *
+ * @param timeout
+ * The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+ * @param unit
+ * The unit of time that the given timeout represents.
+ *
+ * @return the next receiver that has a pending delivery available based on policy.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException;
+
+ /**
+ * Waits for the given duration for a receiver created from this session to have a delivery ready
+ * for receipt. The selection of the next receiver when more than one exits which has pending
+ * deliveries is based upon the value of the {@link NextReceiverPolicy} provided by the caller. If
+ * no receiver has an available delivery within the given timeout this method returns null.
+ *
+ * @param policy
+ * The policy to apply when selecting the next receiver.
+ * @param timeout
+ * The timeout value used to control how long the method waits for a new {@link Delivery} to be available.
+ * @param unit
+ * The unit of time that the given timeout represents.
+ *
+ * @return the next receiver that has a pending delivery available based on policy.
+ *
+ * @throws ClientException if an internal error occurs.
+ */
+ Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException;
+
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java
index a61221c3..dac19d5f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SessionOptions.java
@@ -51,6 +51,7 @@ public class SessionOptions {
private String[] offeredCapabilities;
private String[] desiredCapabilities;
private Map<String, Object> properties;
+ private NextReceiverPolicy nextReceiverPolicy = ConnectionOptions.DEFAULT_NEXT_RECEIVER_POLICY;
/**
* Create a new {@link SessionOptions} instance configured with default configuration settings.
@@ -92,6 +93,7 @@ public class SessionOptions {
other.requestTimeout(requestTimeout);
other.incomingCapacity(incomingCapacity);
other.outgoingCapacity(outgoingCapacity);
+ other.defaultNextReceiverPolicy(nextReceiverPolicy);
if (offeredCapabilities != null) {
other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
@@ -398,4 +400,25 @@ public class SessionOptions {
this.outgoingCapacity = outgoingCapacity;
return this;
}
+
+ /**
+ * @return the configured default next receiver policy for a session created using these options.
+ */
+ public NextReceiverPolicy defaultNextReceiverPolicy() {
+ return nextReceiverPolicy;
+ }
+
+ /**
+ * Configures the default next receiver policy for the session created with these
+ * configuration options.
+ *
+ * @param policy
+ * The default next receiver policy to assign to a new session.
+ *
+ * @return this {@link SessionOptions} instance.
+ */
+ public SessionOptions defaultNextReceiverPolicy(NextReceiverPolicy policy) {
+ this.nextReceiverPolicy = policy;
+ return this;
+ }
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index c6765dfc..a1546ebe 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -41,6 +41,7 @@ import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.DisconnectionEvent;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.ReconnectLocation;
@@ -487,6 +488,30 @@ public final class ClientConnection implements Connection {
return request(this, result).send(message);
}
+ @Override
+ public Receiver nextReceiver() throws ClientException {
+ checkClosedOrFailed();
+ return defaultSession().nextReceiver();
+ }
+
+ @Override
+ public Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException {
+ checkClosedOrFailed();
+ return defaultSession().nextReceiver(timeout, unit);
+ }
+
+ @Override
+ public Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException {
+ checkClosedOrFailed();
+ return defaultSession().nextReceiver(policy);
+ }
+
+ @Override
+ public Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException {
+ checkClosedOrFailed();
+ return defaultSession().nextReceiver(policy, timeout, unit);
+ }
+
@Override
public Map<String, Object> properties() throws ClientException {
waitForOpenToComplete();
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
index 7bb52e0c..852ee616 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -303,7 +303,7 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>,
if (options.openTimeout() > 0) {
executor.schedule(() -> {
if (!openFuture.isDone()) {
- immediateLinkShutdown(new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
+ immediateLinkShutdown(new ClientOperationTimedOutException("Link open timed out waiting for remote to respond"));
}
}, options.openTimeout(), TimeUnit.MILLISECONDS);
}
@@ -462,7 +462,8 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>,
protected boolean notClosedOrFailed(ClientFuture<?> request, ProtonType protonLink) {
if (isClosed()) {
- request.failed(new ClientIllegalStateException("The Sender was explicitly closed", failureCause));
+ request.failed(new ClientIllegalStateException(
+ String.format("The %s was explicitly closed", protonLink().isReceiver() ? "Receiver" : "Sender"), failureCause));
return false;
} else if (failureCause != null) {
request.failed(failureCause);
@@ -486,7 +487,8 @@ public abstract class ClientLinkType<LinkType extends Link<LinkType>,
protected void checkClosedOrFailed() throws ClientException {
if (isClosed()) {
- throw new ClientIllegalStateException("The Sender was explicitly closed", failureCause);
+ throw new ClientIllegalStateException(
+ String.format("The %s was explicitly closed", protonLink().isReceiver() ? "Receiver" : "Sender"), failureCause);
} else if (failureCause != null) {
throw failureCause;
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
new file mode 100644
index 00000000..a6f04b5c
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNextReceiverSelector.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import java.security.SecureRandom;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
+import org.apache.qpid.protonj2.client.Receiver;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
+import org.apache.qpid.protonj2.client.futures.ClientFuture;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+
+/**
+ * Implements the various strategies of selecting the next receiver with pending
+ * messages and or waiting for new incoming messages.
+ */
+final class ClientNextReceiverSelector {
+
+ private static final String LAST_RETURNED_STATE_KEY = "Last_Returned_State";
+
+ private final ArrayDeque<ClientFuture<Receiver>> pending = new ArrayDeque<>();
+ private final ClientSession session;
+ private SecureRandom srand;
+
+ public ClientNextReceiverSelector(ClientSession session) {
+ this.session = session;
+
+ // Same processing as reconnect for session delivery tapping
+ handleReconnect();
+ }
+
+ public void nextReceiver(ClientFuture<Receiver> request, NextReceiverPolicy policy, long timeout) {
+ Objects.requireNonNull(policy, "The next receiver selection policy cannot be null");
+
+ ClientReceiver result = null;
+
+ switch (policy) {
+ case ROUND_ROBIN:
+ result = selectNextAvailable();
+ break;
+ case FIRST_AVAILABLE:
+ result = selectFirstAvailable();
+ break;
+ case LARGEST_BACKLOG:
+ result = selectLargestBacklog();
+ break;
+ case SMALLEST_BACKLOG:
+ result = selectSmallestBacklog();
+ break;
+ case RANDOM:
+ result = selectRandomReceiver();
+ break;
+ default:
+ request.failed(new ClientException("Next receiver called with invalid or unknown policy:" + policy));
+ break;
+ }
+
+ if (result == null) {
+ pending.add(request); // Wait for the next incoming delivery
+ if (timeout > 0) {
+ session.getScheduler().schedule(() -> {
+ if (!request.isDone()) {
+ pending.remove(request);
+ request.complete(null);
+ }
+ }, timeout, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ // Track last returned to update state for Round Robin next receiver dispatch
+ // this effectively ties all policies together in updating the next result from
+ // a call that requests the round robin fairness policy.
+ result.protonLink().getSession().getAttachments().set(LAST_RETURNED_STATE_KEY, result);
+
+ request.complete(result);
+ }
+ }
+
+ public void handleReconnect() {
+ session.getProtonSession().deliveryReadHandler(this::deliveryReadHandler);
+ }
+
+ public void handleShutdown() {
+ ClientException cause = null;
+
+ if (session.isClosed()) {
+ cause = new ClientIllegalStateException("The Session was explicitly closed", session.getFailureCause());
+ } else if (session.getFailureCause() != null) {
+ cause = session.getFailureCause();
+ } else {
+ cause = new ClientIllegalStateException("The session was closed without a specific error being provided");
+ }
+
+ for (ClientFuture<Receiver> request : pending) {
+ request.failed(cause);
+ }
+
+ pending.clear();
+ }
+
+ private ClientReceiver selectRandomReceiver() {
+ ArrayList<ClientReceiver> candidates = session.getProtonSession().receivers().stream()
+ .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+ r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+ .map((r) -> (ClientReceiver) r.getLinkedResource())
+ .collect(Collectors.toCollection(ArrayList::new));
+
+ if (srand == null) {
+ srand = new SecureRandom();
+ }
+
+ Collections.shuffle(candidates, srand);
+
+ return candidates.isEmpty() ? null : candidates.get(0);
+ }
+
+ private ClientReceiver selectNextAvailable() {
+ final ClientReceiver lastReceiver = session.getProtonSession().getAttachments().get(LAST_RETURNED_STATE_KEY);
+
+ ClientReceiver result = null;
+
+ if (lastReceiver != null && !lastReceiver.protonReceiver.isLocallyClosedOrDetached()) {
+ boolean foundLast = false;
+ for (org.apache.qpid.protonj2.engine.Receiver protonReceover : session.getProtonSession().receivers()) {
+ if (protonReceover.getLinkedResource() instanceof ClientReceiver) {
+ if (foundLast) {
+ ClientReceiver candidate = protonReceover.getLinkedResource();
+ if (candidate.queuedDeliveries() > 0) {
+ result = candidate;
+ }
+ } else {
+ foundLast = protonReceover.getLinkedResource() == lastReceiver;
+ }
+ }
+ }
+ } else {
+ session.getProtonSession().getAttachments().set(LAST_RETURNED_STATE_KEY, null);
+ }
+
+ return result != null ? result : selectFirstAvailable();
+ }
+
+ private ClientReceiver selectFirstAvailable() {
+ return session.getProtonSession().receivers().stream()
+ .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+ r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+ .map((r) -> (ClientReceiver) r.getLinkedResource())
+ .findFirst()
+ .orElse(null);
+ }
+
+ private ClientReceiver selectLargestBacklog() {
+ return session.getProtonSession().receivers().stream()
+ .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+ r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+ .map((r) -> (ClientReceiver) r.getLinkedResource())
+ .max(Comparator.comparingLong(ClientReceiver::queuedDeliveries))
+ .orElse(null);
+ }
+
+ private ClientReceiver selectSmallestBacklog() {
+ return session.getProtonSession().receivers().stream()
+ .filter((r) -> r.getLinkedResource() instanceof ClientReceiver &&
+ r.getLinkedResource(ClientReceiver.class).queuedDeliveries() > 0)
+ .map((r) -> (ClientReceiver) r.getLinkedResource())
+ .min(Comparator.comparingLong(ClientReceiver::queuedDeliveries))
+ .orElse(null);
+ }
+
+ private void deliveryReadHandler(IncomingDelivery delivery) {
+ // When a new delivery arrives that is completed
+ if (!pending.isEmpty() && !delivery.isPartial() && !delivery.isAborted()) {
+ // We only handle next receiver events for normal client receivers and
+ // not for stream receiver types etc.
+ if (delivery.getLink().getLinkedResource() instanceof ClientReceiver) {
+ ClientReceiver receiver = delivery.getLink().getLinkedResource();
+
+ // Track last returned to update state for Round Robin next receiver dispatch
+ delivery.getLink().getSession().getAttachments().set(LAST_RETURNED_STATE_KEY, receiver);
+
+ pending.poll().complete(receiver);
+ }
+ }
+ }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index 91c7270a..1f7cafd4 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -42,7 +42,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final ReceiverOptions options;
- private final FifoDeliveryQueue messageQueue;
+ private final FifoDeliveryQueue deliveryQueue;
ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
super(session, receiverId, options, receiver);
@@ -53,8 +53,8 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
protonReceiver.addCredit(options.creditWindow());
}
- messageQueue = new FifoDeliveryQueue(options.creditWindow());
- messageQueue.start();
+ deliveryQueue = new FifoDeliveryQueue(options.creditWindow());
+ deliveryQueue.start();
}
@Override
@@ -67,7 +67,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
checkClosedOrFailed();
try {
- ClientDelivery delivery = messageQueue.dequeue(units.toMillis(timeout));
+ ClientDelivery delivery = deliveryQueue.dequeue(units.toMillis(timeout));
if (delivery != null) {
if (options.autoAccept()) {
disposition(delivery.protonDelivery(), Accepted.getInstance(), options.autoSettle());
@@ -91,7 +91,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
public Delivery tryReceive() throws ClientException {
checkClosedOrFailed();
- Delivery delivery = messageQueue.dequeueNoWait();
+ Delivery delivery = deliveryQueue.dequeueNoWait();
if (delivery != null) {
if (options.autoAccept()) {
delivery.disposition(org.apache.qpid.protonj2.client.DeliveryState.accepted(), options.autoSettle());
@@ -107,7 +107,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
@Override
public long queuedDeliveries() {
- return messageQueue.size();
+ return deliveryQueue.size();
}
@Override
@@ -182,8 +182,8 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
}
if (!delivery.isPartial()) {
- LOG.trace("Receiver {} has incoming Message(s).", linkId);
- messageQueue.enqueue(new ClientDelivery(this, delivery));
+ LOG.trace("{} has incoming Message(s).", this);
+ deliveryQueue.enqueue(new ClientDelivery(this, delivery));
} else {
delivery.claimAvailableBytes();
}
@@ -197,7 +197,7 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
if (creditWindow > 0) {
int currentCredit = protonReceiver.getCredit();
if (currentCredit <= creditWindow * 0.5) {
- int potentialPrefetch = currentCredit + messageQueue.size();
+ int potentialPrefetch = currentCredit + deliveryQueue.size();
if (potentialPrefetch <= creditWindow * 0.7) {
int additionalCredit = creditWindow - potentialPrefetch;
@@ -213,11 +213,17 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
}
}
+ @Override
+ protected void linkSpecificLocalCloseHandler() {
+ deliveryQueue.stop(); // Ensure blocked receivers are all unblocked.
+ deliveryQueue.clear();
+ }
+
@Override
protected void recreateLinkForReconnect() {
- int previousCredit = protonReceiver.getCredit() + messageQueue.size();
+ int previousCredit = protonReceiver.getCredit() + deliveryQueue.size();
- messageQueue.clear(); // Prefetched messages should be discarded.
+ deliveryQueue.clear(); // Prefetched messages should be discarded.
if (drainingFuture != null) {
drainingFuture.complete(this);
@@ -234,10 +240,4 @@ public final class ClientReceiver extends ClientReceiverLinkType<Receiver> imple
protonReceiver.setLinkedResource(this);
protonReceiver.addCredit(previousCredit);
}
-
- @Override
- protected void linkSpecificLocalCloseHandler() {
- messageQueue.stop(); // Ensure blocked receivers are all unblocked.
- messageQueue.clear();
- }
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
index 76df24e5..284a78a5 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
@@ -48,6 +48,7 @@ public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverT
super(session, linkId, options);
this.protonReceiver = protonReceiver;
+ this.protonReceiver.setLinkedResource(self());
}
@Override
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
index 9455fcb7..c38beab6 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
@@ -16,6 +16,7 @@
*/
package org.apache.qpid.protonj2.client.impl;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import org.apache.qpid.protonj2.client.ErrorCondition;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.Sender;
@@ -52,7 +54,7 @@ import org.slf4j.LoggerFactory;
*/
public class ClientSession implements Session {
- private static final Logger LOG = LoggerFactory.getLogger(ClientSession.class);
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final long INFINITE = -1;
@@ -70,6 +72,7 @@ public class ClientSession implements Session {
private final ClientSenderBuilder senderBuilder;
private final ClientReceiverBuilder receiverBuilder;
+ private ClientNextReceiverSelector nextReceiverSelector;
private volatile int closed;
private volatile ClientException failureCause;
private ClientTransactionContext txnContext = NO_OP_TXN_CONTEXT;
@@ -351,6 +354,38 @@ public class ClientSession implements Session {
return connection.request(this, rollbackFuture);
}
+ @Override
+ public Receiver nextReceiver() throws ClientException {
+ return nextReceiver(options.defaultNextReceiverPolicy(), -1, TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException {
+ return nextReceiver(options.defaultNextReceiverPolicy(), timeout, unit);
+ }
+
+ @Override
+ public Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException {
+ return nextReceiver(policy, -1, TimeUnit.MICROSECONDS);
+ }
+
+ @Override
+ public Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException {
+ checkClosedOrFailed();
+ final ClientFuture<Receiver> nextPending = getFutureFactory().createFuture();
+
+ serializer.execute(() -> {
+ try {
+ checkClosedOrFailed();
+ getNextReceiverSelector().nextReceiver(nextPending, policy, unit.toMillis(timeout));
+ } catch (Throwable error) {
+ nextPending.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
+ }
+ });
+
+ return connection.request(this, nextPending);
+ }
+
//----- Internal resource open APIs expected to be called from the connection event loop
ClientReceiver internalOpenReceiver(String address, ReceiverOptions receiverOptions) throws ClientException {
@@ -488,6 +523,14 @@ public class ClientSession implements Session {
}
}
+ private ClientNextReceiverSelector getNextReceiverSelector() {
+ if (nextReceiverSelector == null) {
+ nextReceiverSelector = new ClientNextReceiverSelector(this);
+ }
+
+ return nextReceiverSelector;
+ }
+
//----- Handle Events from the Proton Session
private void handleLocalOpen(org.apache.qpid.protonj2.engine.Session session) {
@@ -550,6 +593,10 @@ public class ClientSession implements Session {
protonSession.close();
protonSession = configureSession(ClientSessionBuilder.recreateSession(connection, protonSession, options));
+ if (nextReceiverSelector != null) {
+ nextReceiverSelector.handleReconnect();
+ }
+
open();
} else {
final Connection connection = engine.connection();
@@ -580,6 +627,10 @@ public class ClientSession implements Session {
} catch (Exception ignore) {
}
+ if (nextReceiverSelector != null) {
+ nextReceiverSelector.handleShutdown();
+ }
+
if (failureCause != null) {
openFuture.failed(failureCause);
} else {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java
index 23f3a3d1..c212cb7b 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSessionBuilder.java
@@ -76,6 +76,7 @@ final class ClientSessionBuilder {
sessionOptions.requestTimeout(connectionOptions.requestTimeout());
sessionOptions.sendTimeout(connectionOptions.sendTimeout());
sessionOptions.drainTimeout(connectionOptions.drainTimeout());
+ sessionOptions.defaultNextReceiverPolicy(connectionOptions.defaultNextReceiverPolicy());
}
defaultSessionOptions = sessionOptions;
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
index e13863d8..5921f90b 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ConnectionTest.java
@@ -29,7 +29,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.ClientOptions;
@@ -37,6 +39,7 @@ import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.ErrorCondition;
import org.apache.qpid.protonj2.client.Message;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.Sender;
@@ -48,11 +51,13 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIOException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusDurability;
import org.apache.qpid.protonj2.test.driver.codec.messaging.TerminusExpiryPolicy;
import org.apache.qpid.protonj2.test.driver.matchers.messaging.SourceMatcher;
+import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.transport.AMQPHeader;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.apache.qpid.protonj2.types.transport.ConnectionError;
@@ -1606,6 +1611,139 @@ public class ConnectionTest extends ImperativeClientTestCase {
}
}
+ @Test
+ public void testUserSpeicifedNextReceiverPolicyOverridesConfiguration() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.SMALLEST_BACKLOG);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = connection.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = connection.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ Receiver next = connection.nextReceiver(NextReceiverPolicy.LARGEST_BACKLOG);
+ assertSame(next, receiver2);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterConnectionClosedRandom() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.RANDOM);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterConnectionClosedLargestBacklog() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.LARGEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterConnectionClosedSmallestBacklog() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.SMALLEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterConnectionClosedFirstAvailable() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.FIRST_AVAILABLE);
+ }
+
+ public void doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy policy) throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(1);
+ final AtomicReference<Exception> error = new AtomicReference<>();
+
+ final Client container = Client.create();
+ final ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+ final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ started.countDown();
+ connection.nextReceiver();
+ } catch (ClientException e) {
+ error.set(e);
+ } finally {
+ done.countDown();
+ }
+ });
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ assertTrue(started.await(10, TimeUnit.SECONDS));
+
+ connection.closeAsync();
+
+ assertTrue(done.await(10, TimeUnit.SECONDS));
+ assertTrue(error.get() instanceof ClientConnectionRemotelyClosedException);
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
@Disabled("Disabled due to requirement of hard coded port")
@Test
public void testLocalPortOption() throws Exception {
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
index 8f88e35b..f18a46db 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReceiverTest.java
@@ -2649,124 +2649,120 @@ public class ReceiverTest extends ImperativeClientTestCase {
byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
try (ProtonTestServer peer = new ProtonTestServer()) {
- peer.expectSASLAnonymousConnect();
- peer.expectOpen().respond();
- peer.expectBegin().respond();
- peer.expectAttach().ofReceiver().respond();
- peer.expectFlow().withLinkCredit(10);
- for (int i = 0; i < 10; ++i) {
- peer.remoteTransfer().withDeliveryId(i)
- .withMore(false)
- .withMessageFormat(0)
- .withPayload(payload).queue();
- }
- peer.start();
-
- URI remoteURI = peer.getServerURI();
-
- LOG.info("Test started, peer listening on: {}", remoteURI);
-
- Client container = Client.create();
- Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
-
- ReceiverOptions options = new ReceiverOptions();
- options.autoAccept(autoAccept);
- options.creditWindow(10);
-
- Receiver receiver = connection.openReceiver("test-receiver", options);
-
- Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
-
- peer.waitForScriptToComplete();
- if (autoAccept)
- {
- peer.expectDisposition().withFirst(0);
- peer.expectDisposition().withFirst(1);
- }
-
- // Consume messages 1 and 2 which should not provoke credit replenishment
- // as there are still 8 outstanding which is above the 70% mark
- assertNotNull(receiver.receive()); // #1
- assertNotNull(receiver.receive()); // #2
-
- peer.waitForScriptToComplete();
- peer.expectAttach().ofSender().respond();
- peer.expectDetach().respond();
- if (autoAccept)
- {
- peer.expectDisposition().withFirst(2);
- }
- peer.expectFlow().withLinkCredit(3);
-
- // Ensure that no additional frames from last receive overlap with this one
- connection.openSender("test").openFuture().get().close();
-
- // Now consume message 3 which will trip the replenish barrier and the
- // credit should be updated to reflect that we still have 7 queued
- assertNotNull(receiver.receive()); // #3
-
- peer.waitForScriptToComplete();
- if (autoAccept)
- {
- peer.expectDisposition().withFirst(3);
- peer.expectDisposition().withFirst(4);
- }
-
- // Consume messages 4 and 5 which should not provoke credit replenishment
- // as there are still 5 outstanding plus the credit we sent last time
- // which is above the 70% mark
- assertNotNull(receiver.receive()); // #4
- assertNotNull(receiver.receive()); // #5
-
- peer.waitForScriptToComplete();
- peer.expectAttach().ofSender().respond();
- peer.expectDetach().respond();
- if (autoAccept)
- {
- peer.expectDisposition().withFirst(5);
- }
- peer.expectFlow().withLinkCredit(6);
-
- // Ensure that no additional frames from last receive overlap with this one
- connection.openSender("test").openFuture().get().close();
-
- // Consume number 6 which means we only have 4 outstanding plus the three
- // that we sent last time we flowed which is 70% of possible prefetch so
- // we should flow to top off credit which would be 6 since we have four
- // still pending
- assertNotNull(receiver.receive()); // #6
-
- peer.waitForScriptToComplete();
- if (autoAccept)
- {
- peer.expectDisposition().withFirst(6);
- peer.expectDisposition().withFirst(7);
- }
-
- // Consume deliveries 7 and 8 which should not flow as we should be
- // above the threshold of 70% since we would now have 2 outstanding
- // and 6 credits on the link
- assertNotNull(receiver.receive()); // #7
- assertNotNull(receiver.receive()); // #8
-
- peer.waitForScriptToComplete();
- if (autoAccept)
- {
- peer.expectDisposition().withFirst(8);
- peer.expectDisposition().withFirst(9);
- }
-
- // Now consume 9 and 10 but we still shouldn't flow more credit because
- // the link credit is above the 50% mark for overall credit windowing.
- assertNotNull(receiver.receive()); // #9
- assertNotNull(receiver.receive()); // #10
-
- peer.waitForScriptToComplete();
- peer.expectClose().respond();
-
- connection.close();
-
- peer.waitForScriptToComplete();
- }
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ for (int i = 0; i < 10; ++i) {
+ peer.remoteTransfer().withDeliveryId(i)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ }
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+
+ ReceiverOptions options = new ReceiverOptions();
+ options.autoAccept(autoAccept);
+ options.creditWindow(10);
+
+ Receiver receiver = connection.openReceiver("test-receiver", options);
+
+ Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
+
+ peer.waitForScriptToComplete();
+ if (autoAccept)
+ {
+ peer.expectDisposition().withFirst(0);
+ peer.expectDisposition().withFirst(1);
+ }
+
+ // Consume messages 1 and 2 which should not provoke credit replenishment
+ // as there are still 8 outstanding which is above the 70% mark
+ assertNotNull(receiver.receive()); // #1
+ assertNotNull(receiver.receive()); // #2
+
+ peer.waitForScriptToComplete();
+ peer.expectAttach().ofSender().respond();
+ peer.expectDetach().respond();
+ if (autoAccept)
+ {
+ peer.expectDisposition().withFirst(2);
+ }
+ peer.expectFlow().withLinkCredit(3);
+
+ // Ensure that no additional frames from last receive overlap with this one
+ connection.openSender("test").openFuture().get().close();
+
+ // Now consume message 3 which will trip the replenish barrier and the
+ // credit should be updated to reflect that we still have 7 queued
+ assertNotNull(receiver.receive()); // #3
+
+ peer.waitForScriptToComplete();
+ if (autoAccept) {
+ peer.expectDisposition().withFirst(3);
+ peer.expectDisposition().withFirst(4);
+ }
+
+ // Consume messages 4 and 5 which should not provoke credit replenishment
+ // as there are still 5 outstanding plus the credit we sent last time
+ // which is above the 70% mark
+ assertNotNull(receiver.receive()); // #4
+ assertNotNull(receiver.receive()); // #5
+
+ peer.waitForScriptToComplete();
+ peer.expectAttach().ofSender().respond();
+ peer.expectDetach().respond();
+ if (autoAccept) {
+ peer.expectDisposition().withFirst(5);
+ }
+ peer.expectFlow().withLinkCredit(6);
+
+ // Ensure that no additional frames from last receive overlap with this one
+ connection.openSender("test").openFuture().get().close();
+
+ // Consume number 6 which means we only have 4 outstanding plus the three
+ // that we sent last time we flowed which is 70% of possible prefetch so
+ // we should flow to top off credit which would be 6 since we have four
+ // still pending
+ assertNotNull(receiver.receive()); // #6
+
+ peer.waitForScriptToComplete();
+ if (autoAccept) {
+ peer.expectDisposition().withFirst(6);
+ peer.expectDisposition().withFirst(7);
+ }
+
+ // Consume deliveries 7 and 8 which should not flow as we should be
+ // above the threshold of 70% since we would now have 2 outstanding
+ // and 6 credits on the link
+ assertNotNull(receiver.receive()); // #7
+ assertNotNull(receiver.receive()); // #8
+
+ peer.waitForScriptToComplete();
+ if (autoAccept) {
+ peer.expectDisposition().withFirst(8);
+ peer.expectDisposition().withFirst(9);
+ }
+
+ // Now consume 9 and 10 but we still shouldn't flow more credit because
+ // the link credit is above the 50% mark for overall credit windowing.
+ assertNotNull(receiver.receive()); // #9
+ assertNotNull(receiver.receive()); // #10
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
}
}
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java
index 8da2b6f6..dfefbf6d 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectSessionTest.java
@@ -17,19 +17,27 @@
package org.apache.qpid.protonj2.client.impl;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Delivery;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
import org.apache.qpid.protonj2.client.Receiver;
+import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.Session;
import org.apache.qpid.protonj2.client.SessionOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
@@ -256,4 +264,96 @@ class ReconnectSessionTest extends ImperativeClientTestCase {
finalPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesRandom() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RANDOM);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesLargestBacklog() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LARGEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SMALLEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesFirstAvailable() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FIRST_AVAILABLE);
+ }
+
+ public void doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy) throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer firstPeer = new ProtonTestServer();
+ ProtonTestServer finalPeer = new ProtonTestServer()) {
+
+ firstPeer.expectSASLAnonymousConnect();
+ firstPeer.expectOpen().respond();
+ firstPeer.expectBegin().respond();
+ firstPeer.expectAttach().ofReceiver().respond();
+ firstPeer.expectFlow().withLinkCredit(10);
+ firstPeer.dropAfterLastHandler();
+ firstPeer.start();
+
+ finalPeer.expectSASLAnonymousConnect();
+ finalPeer.expectOpen().respond();
+ finalPeer.expectBegin().respond();
+ finalPeer.expectAttach().ofReceiver().respond();
+ finalPeer.expectFlow().withLinkCredit(10);
+ finalPeer.start();
+
+ final URI firstURI = firstPeer.getServerURI();
+ final URI finalURI = finalPeer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", firstURI);
+
+ final CountDownLatch done = new CountDownLatch(1);
+
+ ConnectionOptions options = new ConnectionOptions();
+ options.defaultNextReceiverPolicy(policy);
+ options.reconnectOptions().reconnectEnabled(true);
+ options.reconnectOptions().addReconnectLocation(finalURI.getHost(), finalURI.getPort());
+
+ Client container = Client.create();
+ Connection connection = container.connect(firstURI.getHost(), firstURI.getPort(), options);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ Receiver receiver = connection.nextReceiver();
+ Delivery delivery = receiver.receive();
+ LOG.info("Next receiver returned delivery with body: {}", delivery.message().body());
+ done.countDown();
+ } catch (ClientException e) {
+ e.printStackTrace();
+ }
+ });
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+
+ firstPeer.waitForScriptToComplete();
+ finalPeer.waitForScriptToComplete();
+
+ finalPeer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).later(15);
+
+ finalPeer.waitForScriptToComplete();
+
+ assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ finalPeer.waitForScriptToComplete();
+ finalPeer.expectClose().respond();
+
+ connection.close();
+
+ finalPeer.waitForScriptToComplete();
+ }
+ }
}
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
index c39a0e4b..475fa56b 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SessionTest.java
@@ -18,6 +18,9 @@ package org.apache.qpid.protonj2.client.impl;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -25,13 +28,19 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
+import org.apache.qpid.protonj2.client.Delivery;
import org.apache.qpid.protonj2.client.ErrorCondition;
+import org.apache.qpid.protonj2.client.NextReceiverPolicy;
+import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.ReceiverOptions;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.Session;
@@ -40,7 +49,9 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIOException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
+import org.apache.qpid.protonj2.types.messaging.AmqpValue;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -589,4 +600,983 @@ public class SessionTest extends ImperativeClientTestCase {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
+
+ @Test
+ public void testNextReceiverFromDefaultSessionReturnsSameReceiverForQueuedDeliveries() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ for (int i = 0; i < 10; ++i) {
+ peer.remoteTransfer().withDeliveryId(i)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ }
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions connOptions = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.FIRST_AVAILABLE);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connOptions);
+
+ ReceiverOptions options = new ReceiverOptions().creditWindow(0).autoAccept(false);
+ Receiver receiver = connection.openReceiver("test-receiver", options);
+ receiver.addCredit(10);
+
+ Wait.waitFor(() -> receiver.queuedDeliveries() == 10);
+
+ peer.waitForScriptToComplete();
+
+ for (int i = 0; i < 10; ++i) {
+ Receiver nextReceiver = connection.nextReceiver();
+ assertSame(receiver, nextReceiver);
+ Delivery delivery = nextReceiver.receive();
+ assertNotNull(delivery);
+ }
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverTimesOut() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+
+ connection.openReceiver("test-receiver").openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ assertNull(connection.nextReceiver(10, TimeUnit.MILLISECONDS));
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverReturnsAllReceiversEventually() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+
+ ReceiverOptions options = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ connection.openReceiver("test-receiver1", options).openFuture().get();
+ connection.openReceiver("test-receiver2", options).openFuture().get();
+ connection.openReceiver("test-receiver3", options).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Receiver receiver1 = connection.nextReceiver(NextReceiverPolicy.FIRST_AVAILABLE);
+ assertNotNull(receiver1.receive());
+ Receiver receiver2 = connection.nextReceiver(NextReceiverPolicy.FIRST_AVAILABLE);
+ assertNotNull(receiver2.receive());
+ Receiver receiver3 = connection.nextReceiver(NextReceiverPolicy.FIRST_AVAILABLE);
+ assertNotNull(receiver3.receive());
+
+ assertNotSame(receiver1, receiver2);
+ assertNotSame(receiver1, receiver3);
+ assertNotSame(receiver2, receiver3);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testConnectionOptionsConfiguresLargestBacklogNextReceiverPolicy() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.LARGEST_BACKLOG);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = connection.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = connection.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ Receiver next = connection.nextReceiver();
+ assertSame(next, receiver2);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testSessionOptionsConfiguresLargestBacklogNextReceiverPolicy() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.LARGEST_BACKLOG);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ Receiver next = session.nextReceiver();
+ assertSame(next, receiver2);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testUserSpeicifedNextReceiverPolicyOverridesConfiguration() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.SMALLEST_BACKLOG);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ Receiver next = session.nextReceiver(NextReceiverPolicy.LARGEST_BACKLOG);
+ assertSame(next, receiver2);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testSessionOptionsConfiguresSmallestBacklogNextReceiverPolicy() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(4)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(5)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.SMALLEST_BACKLOG);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 3);
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ Receiver next = session.nextReceiver();
+ assertSame(next, receiver3);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesRoundRobin() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.ROUND_ROBIN);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesRandom() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.RANDOM);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesLargestBacklog() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.LARGEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesSmallestBacklog() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.SMALLEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverCompletesAfterDeliveryArrivesFirstAvailable() throws Exception {
+ doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy.FIRST_AVAILABLE);
+ }
+
+ public void doTestNextReceiverCompletesAfterDeliveryArrives(NextReceiverPolicy policy) throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final CountDownLatch done = new CountDownLatch(1);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ Receiver receiver = connection.nextReceiver();
+ Delivery delivery = receiver.receive();
+ LOG.info("Next receiver returned delivery with body: {}", delivery.message().body());
+ done.countDown();
+ } catch (ClientException e) {
+ e.printStackTrace();
+ }
+ });
+
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).later(15);
+
+ peer.waitForScriptToComplete();
+
+ assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterSessionClosedRoundRobin() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.ROUND_ROBIN);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterSessionClosedRandom() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.RANDOM);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterSessionClosedLargestBacklog() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.LARGEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterSessionClosedSmallestBacklog() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.SMALLEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverThrowsAfterSessionClosedFirstAvailable() throws Exception {
+ doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy.FIRST_AVAILABLE);
+ }
+
+ public void doTestNextReceiverThrowsAfterSessionClosed(NextReceiverPolicy policy) throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(1);
+ final AtomicReference<Exception> error = new AtomicReference<>();
+
+ final Client container = Client.create();
+ final ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+ final Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ final Session session = connection.openSession().openFuture().get();
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ started.countDown();
+ session.nextReceiver();
+ } catch (ClientException e) {
+ error.set(e);
+ } finally {
+ done.countDown();
+ }
+ });
+
+ peer.waitForScriptToComplete();
+
+ assertTrue(started.await(10, TimeUnit.SECONDS));
+
+ peer.expectEnd().respond();
+
+ session.closeAsync();
+
+ assertTrue(done.await(10, TimeUnit.SECONDS));
+ assertTrue(error.get() instanceof ClientIllegalStateException);
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateRoundRobin() throws Exception {
+ doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.ROUND_ROBIN);
+ }
+
+ @Test
+ public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateRandom() throws Exception {
+ doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.RANDOM);
+ }
+
+ @Test
+ public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateLargestBacklog() throws Exception {
+ doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.LARGEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateSmallestBacklog() throws Exception {
+ doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.SMALLEST_BACKLOG);
+ }
+
+ @Test
+ public void testNextReceiverCompletesWhenCalledBeforeReceiverCreateFirstAvailable() throws Exception {
+ doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy.FIRST_AVAILABLE);
+ }
+
+ public void doTestNextReceiverCompletesWhenCalledBeforeReceiverCreate(NextReceiverPolicy policy) throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch done = new CountDownLatch(1);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(policy);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+
+ ForkJoinPool.commonPool().execute(() -> {
+ try {
+ started.countDown();
+ Receiver receiver = connection.nextReceiver();
+ Delivery delivery = receiver.receive();
+ LOG.info("Next receiver returned delivery with body: {}", delivery.message().body());
+ done.countDown();
+ } catch (ClientException e) {
+ e.printStackTrace();
+ }
+ });
+
+ assertTrue(started.await(10, TimeUnit.SECONDS));
+
+ connection.openFuture().get();
+
+ peer.waitForScriptToComplete();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue().afterDelay(10);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ connection.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ assertTrue(done.await(10, TimeUnit.SECONDS));
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverRoundRobinReturnsNextReceiverAfterLast() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ assertEquals(0, receiver1.queuedDeliveries());
+
+ Receiver next = session.nextReceiver();
+ assertSame(next, receiver2);
+ next = session.nextReceiver();
+ assertSame(next, receiver3);
+
+ peer.waitForScriptToComplete();
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverRoundRobinPolicyWrapsAround() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ assertEquals(0, receiver1.queuedDeliveries());
+
+ Receiver next = session.nextReceiver();
+ assertSame(next, receiver2);
+ next = session.nextReceiver();
+ assertSame(next, receiver3);
+
+ peer.waitForScriptToComplete();
+
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).now();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+
+ next = session.nextReceiver();
+ assertSame(next, receiver1);
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverRoundRobinPolicyRestartsWhenLastReceiverClosed() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(1)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(2)
+ .withDeliveryId(2)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+ peer.expectDetach().respond();
+
+ Wait.waitFor(() -> receiver2.queuedDeliveries() == 2);
+ Wait.waitFor(() -> receiver3.queuedDeliveries() == 1);
+
+ assertEquals(0, receiver1.queuedDeliveries());
+
+ Receiver next = session.nextReceiver();
+ assertSame(next, receiver2);
+ next.close();
+
+ peer.waitForScriptToComplete();
+
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(3)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).now();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+
+ next = session.nextReceiver();
+ assertSame(next, receiver1);
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
+
+ @Test
+ public void testNextReceiverRoundRobinPolicySkipsEmptyReceivers() throws Exception {
+ byte[] payload = createEncodedMessage(new AmqpValue<String>("Hello World"));
+
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(10);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.remoteTransfer().withHandle(3)
+ .withDeliveryId(1)
+ .withMore(false)
+ .withMessageFormat(0)
+ .withPayload(payload).queue();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.RANDOM);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ SessionOptions sessionOptions = new SessionOptions().defaultNextReceiverPolicy(NextReceiverPolicy.ROUND_ROBIN);
+ Session session = connection.openSession(sessionOptions);
+
+ ReceiverOptions receiverOptions = new ReceiverOptions().creditWindow(10).autoAccept(false);
+ Receiver receiver1 = session.openReceiver("test-receiver1", receiverOptions).openFuture().get();
+ Receiver receiver2 = session.openReceiver("test-receiver2", receiverOptions).openFuture().get();
+ Receiver receiver3 = session.openReceiver("test-receiver3", receiverOptions).openFuture().get();
+ Receiver receiver4 = session.openReceiver("test-receiver4", receiverOptions).openFuture().get();
+
+ peer.waitForScriptToComplete();
+
+ Wait.waitFor(() -> receiver1.queuedDeliveries() == 1);
+ Wait.waitFor(() -> receiver4.queuedDeliveries() == 1);
+
+ assertEquals(0, receiver2.queuedDeliveries());
+ assertEquals(0, receiver3.queuedDeliveries());
+
+ Receiver next = session.nextReceiver();
+ assertSame(next, receiver1);
+ next = session.nextReceiver();
+ assertSame(next, receiver4);
+
+ peer.waitForScriptToComplete();
+ peer.expectClose().respond();
+
+ connection.close();
+
+ peer.waitForScriptToComplete();
+ }
+ }
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java
index 574af9ca..2b115e35 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/Session.java
@@ -221,8 +221,25 @@ public interface Session extends Endpoint<Session> {
* @param remoteTxnManagerOpenEventHandler
* the EventHandler that will be signaled when a {@link TransactionController} link is remotely opened.
*
- * @return this session for chaining
+ * @return this {@linkplain Session} for chaining
*/
Session transactionManagerOpenHandler(EventHandler<TransactionManager> remoteTxnManagerOpenEventHandler);
+ //----- Session monitoring handlers
+
+ /**
+ * Allows monitoring of incoming deliveries to receivers attached to this {@link Session}. The
+ * {@link Receiver} that is the target of the incoming delivery will be notified first of the
+ * incoming delivery and any processing should be done using the {@link Receiver#deliveryReadHandler(EventHandler)}.
+ * This event point will be trigger only after the {@link Receiver} level handler and should be
+ * used to monitor deliveries passing through a session for logging or other state related actions
+ * performed by the service managing this session.
+ *
+ * @param delivery
+ * The delivery that was read which contains
+ *
+ * @return this {@linkplain Session} for chaining
+ */
+ Session deliveryReadHandler(EventHandler<IncomingDelivery> delivery);
+
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
index 16fa318a..3262b9ef 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonConnection.java
@@ -19,9 +19,8 @@ package org.apache.qpid.protonj2.engine.impl;
import java.lang.ref.SoftReference;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -69,8 +68,8 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn
private Open remoteOpen;
private AMQPHeader remoteHeader;
- private Map<Integer, ProtonSession> localSessions = new HashMap<>();
- private Map<Integer, ProtonSession> remoteSessions = new HashMap<>();
+ private Map<Integer, ProtonSession> localSessions = new LinkedHashMap<>();
+ private Map<Integer, ProtonSession> remoteSessions = new LinkedHashMap<>();
// These would be sessions that were begun and ended before the remote ever
// responded with a matching being and end. The remote is required to complete
@@ -707,7 +706,7 @@ public class ProtonConnection extends ProtonEndpoint<Connection> implements Conn
if (localSessions.isEmpty() && remoteSessions.isEmpty()) {
result = Collections.EMPTY_SET;
} else {
- result = new HashSet<>(localSessions.size());
+ result = new LinkedHashSet<>(localSessions.size());
result.addAll(localSessions.values());
result.addAll(remoteSessions.values());
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
index 954a29c1..6b74d14a 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java
@@ -260,6 +260,12 @@ public class ProtonReceiver extends ProtonLink<Receiver> implements Receiver {
deliveryReadEventHandler.handle(delivery);
}
+ // Allow session owner to monitor deliveries passing through the session
+ // but only after the receiver handlers have had a chance to handle it.
+ if (session.deliveryReadHandler() != null) {
+ session.deliveryReadHandler().handle(delivery);
+ }
+
return this;
}
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java
index ed72baca..a0b536fb 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonSession.java
@@ -18,15 +18,16 @@ package org.apache.qpid.protonj2.engine.impl;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.engine.ConnectionState;
import org.apache.qpid.protonj2.engine.EventHandler;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.Link;
import org.apache.qpid.protonj2.engine.LinkState;
import org.apache.qpid.protonj2.engine.Receiver;
@@ -65,8 +66,8 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
private final ProtonSessionOutgoingWindow outgoingWindow;
private final ProtonSessionIncomingWindow incomingWindow;
- private final Map<String, ProtonSender> senderByNameMap = new HashMap<>();
- private final Map<String, ProtonReceiver> receiverByNameMap = new HashMap<>();
+ private final Map<String, ProtonSender> senderByNameMap = new LinkedHashMap<>();
+ private final Map<String, ProtonReceiver> receiverByNameMap = new LinkedHashMap<>();
private final SplayMap<ProtonLink<?>> localLinks = new SplayMap<>();
private final SplayMap<ProtonLink<?>> remoteLinks = new SplayMap<>();
@@ -85,6 +86,7 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
private EventHandler<Sender> remoteSenderOpenEventHandler;
private EventHandler<Receiver> remoteReceiverOpenEventHandler;
private EventHandler<TransactionManager> remoteTxnManagerOpenEventHandler;
+ private EventHandler<IncomingDelivery> deliveryReadHandler;
/**
* Creates a new {@link ProtonSession} instance bound to the given {@link ProtonConnection}.
@@ -314,7 +316,7 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
if (senderByNameMap.isEmpty()) {
result = Collections.EMPTY_SET;
} else {
- result = new HashSet<>(senderByNameMap.values());
+ result = new LinkedHashSet<>(senderByNameMap.values());
}
return result;
@@ -328,7 +330,7 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
if (receiverByNameMap.isEmpty()) {
result = Collections.EMPTY_SET;
} else {
- result = new HashSet<>(receiverByNameMap.values());
+ result = new LinkedHashSet<>(receiverByNameMap.values());
}
return result;
@@ -449,6 +451,16 @@ public class ProtonSession extends ProtonEndpoint<Session> implements Session {
return remoteTxnManagerOpenEventHandler;
}
+ @Override
+ public ProtonSession deliveryReadHandler(EventHandler<IncomingDelivery> deliveryReadHandler) {
+ this.deliveryReadHandler = deliveryReadHandler;
+ return this;
+ }
+
+ EventHandler<IncomingDelivery> deliveryReadHandler() {
+ return deliveryReadHandler;
+ }
+
//----- Respond to Connection and Engine state changes
void handleConnectionLocallyClosed(ProtonConnection protonConnection) {
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
index 1cb5c3e2..9e5719c0 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonSessionTest.java
@@ -2588,4 +2588,41 @@ public class ProtonSessionTest extends ProtonEngineTestSupport {
assertNotNull(failure);
assertTrue(failure instanceof ProtocolViolationException);
}
+
+ @Test
+ public void testSessionWideDeliveryMonitoringHandler() throws Exception {
+ Engine engine = EngineFactory.PROTON.createNonSaslEngine();
+ engine.errorHandler(result -> failure = result.failureCause());
+ ProtonTestConnector peer = createTestPeer(engine);
+
+ final AtomicBoolean deliveryReadByReceiver = new AtomicBoolean();
+ final AtomicBoolean deliveryReadBySession = new AtomicBoolean();
+
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectOpen().respond().withContainerId("driver");
+ peer.expectBegin().respond();
+ peer.expectAttach().ofReceiver().respond();
+ peer.expectFlow().withLinkCredit(1);
+ peer.remoteTransfer().withHandle(0)
+ .withDeliveryId(0)
+ .withDeliveryTag(new byte[] {1})
+ .onChannel(0)
+ .queue();
+
+ Connection connection = engine.start().open();
+ Session session = connection.session().open();
+
+ session.deliveryReadHandler((delivery) -> deliveryReadBySession.set(true));
+
+ Receiver receiver = session.receiver("test");
+ receiver.deliveryReadHandler((delivery) -> deliveryReadByReceiver.set(true));
+ receiver.open().addCredit(1);
+
+ peer.waitForScriptToComplete();
+
+ assertTrue(deliveryReadByReceiver.get());
+ assertTrue(deliveryReadBySession.get());
+
+ assertNull(failure);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org