You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/11 21:42:39 UTC
[qpid-protonj2] branch main updated: PROTON-2541 Further refine the internal client implementation
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 2d3bbaf5 PROTON-2541 Further refine the internal client implementation
2d3bbaf5 is described below
commit 2d3bbaf5ad300afc0b3c6f25d9ca0dfcdc905578
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 11 17:42:29 2022 -0400
PROTON-2541 Further refine the internal client implementation
Provide some additional abstractions internally to reduce duplicated
code amongst tracker types and sender and receiver link implementations.
---
.../protonj2/client/impl/ClientConnection.java | 2 +-
.../client/impl/ClientConnectionCapabilities.java | 2 +-
.../client/impl/ClientExceptionSupport.java | 2 +-
.../client/impl/ClientNoOpStreamTracker.java | 2 +-
.../protonj2/client/impl/ClientNoOpTracker.java | 2 +-
.../qpid/protonj2/client/impl/ClientReceiver.java | 16 +-
.../client/impl/ClientReceiverLinkType.java | 60 +++++++
.../qpid/protonj2/client/impl/ClientSender.java | 25 +--
.../protonj2/client/impl/ClientSenderLinkType.java | 43 ++++-
.../protonj2/client/impl/ClientStreamReceiver.java | 14 +-
.../protonj2/client/impl/ClientStreamSender.java | 24 +--
.../protonj2/client/impl/ClientStreamTracker.java | 171 +-------------------
.../{ClientTracker.java => ClientTrackable.java} | 73 ++++-----
.../qpid/protonj2/client/impl/ClientTracker.java | 174 +--------------------
.../client/impl/ClientTransactionContext.java | 2 +-
.../protonj2/client/impl/StreamSenderTest.java | 4 +-
16 files changed, 157 insertions(+), 459 deletions(-)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index 9715ecf6..c6765dfc 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory;
/**
* A {@link Connection} implementation that uses the Proton engine for AMQP protocol support.
*/
-public class ClientConnection implements Connection {
+public final class ClientConnection implements Connection {
private static final Logger LOG = LoggerFactory.getLogger(ClientConnection.class);
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java
index 3570edbd..df35d023 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java
@@ -27,7 +27,7 @@ import org.apache.qpid.protonj2.types.Symbol;
* Tracks available known capabilities for the connection to allow the client
* to know what features are supported on the current connection.
*/
-public class ClientConnectionCapabilities {
+public final class ClientConnectionCapabilities {
private boolean anonymousRelaySupported;
private boolean delayedDeliverySupported;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java
index 60756c11..94c76cd1 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java
@@ -42,7 +42,7 @@ import org.apache.qpid.protonj2.types.transport.ConnectionError;
import org.apache.qpid.protonj2.types.transport.ErrorCondition;
import org.apache.qpid.protonj2.types.transport.LinkError;
-class ClientExceptionSupport {
+final class ClientExceptionSupport {
/**
* Checks the given cause to determine if it's already an ClientIOException type and
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
index 493f475f..c7417c36 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
@@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.futures.ClientFutureFactory;
* A dummy Tracker instance that always indicates remote settlement and
* acceptance for {@link StreamSender} instances.
*/
-public class ClientNoOpStreamTracker implements StreamTracker {
+public final class ClientNoOpStreamTracker implements StreamTracker {
private final ClientStreamSender sender;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java
index 0d942ed1..b4d5c2e0 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java
@@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.futures.ClientFutureFactory;
* A dummy Tracker instance that always indicates remote settlement and
* acceptance.
*/
-public class ClientNoOpTracker implements Tracker {
+public final class ClientNoOpTracker implements Tracker {
private final ClientSender sender;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index b13e1fd3..e025d35b 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -17,7 +17,6 @@
package org.apache.qpid.protonj2.client.impl;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -40,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* Client {@link Receiver} implementation.
*/
-public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qpid.protonj2.engine.Receiver> implements Receiver {
+public final class ClientReceiver extends ClientReceiverLinkType<Receiver> implements Receiver {
private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
@@ -48,17 +47,12 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp
private ScheduledFuture<?> drainingTimeout;
private final ReceiverOptions options;
- private final ScheduledExecutorService executor;
private final FifoDeliveryQueue messageQueue;
- private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
-
ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
- super(session, receiverId, options);
+ super(session, receiverId, options, receiver);
this.options = options;
- this.executor = session.getScheduler();
- this.protonReceiver = receiver.setLinkedResource(this);
if (options.creditWindow() > 0) {
protonReceiver.addCredit(options.creditWindow());
@@ -177,6 +171,7 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp
//----- Internal API for the ClientReceiver and other Client objects
+ @Override
void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
checkClosedOrFailed();
asyncApplyDisposition(delivery, state, settle);
@@ -192,11 +187,6 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp
return this;
}
- @Override
- protected org.apache.qpid.protonj2.engine.Receiver protonLink() {
- return protonReceiver;
- }
-
//----- Handlers for proton receiver events
private void handleDeliveryReceived(IncomingDelivery delivery) {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
new file mode 100644
index 00000000..a2a00fa7
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import org.apache.qpid.protonj2.client.Link;
+import org.apache.qpid.protonj2.client.LinkOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+import org.apache.qpid.protonj2.engine.Receiver;
+import org.apache.qpid.protonj2.types.transport.DeliveryState;
+
+/**
+ * Base class for client link types that wrap a proton receiver to provide
+ * delivery dispatch in some manner.
+ */
+public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverType>> extends ClientLinkType<ReceiverType, Receiver> {
+
+ protected Receiver protonReceiver;
+
+ protected ClientReceiverLinkType(ClientSession session, String linkId, LinkOptions<?> options, Receiver protonReceiver) {
+ super(session, linkId, options);
+
+ this.protonReceiver = protonReceiver;
+ }
+
+ @Override
+ protected org.apache.qpid.protonj2.engine.Receiver protonLink() {
+ return protonReceiver;
+ }
+
+ /**
+ * Apply the given disposition and settlement state to the given incoming delivery instance.
+ *
+ * @param delivery
+ * The incoming delivery that will be acted upon
+ * @param state
+ * The delivery state to apply to the given incoming delivery
+ * @param settle
+ * The settlement state to apply to the given incoming delivery
+ *
+ * @throws ClientException if an error occurs while applying the disposition to the delivery.
+ */
+ abstract void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException;
+
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index f449d400..7ff4849b 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -42,22 +42,17 @@ import org.slf4j.LoggerFactory;
/**
* Proton based AMQP Sender
*/
-final class ClientSender extends ClientSenderLinkType<Sender> implements Sender {
+public final class ClientSender extends ClientSenderLinkType<Sender> implements Sender {
private static final Logger LOG = LoggerFactory.getLogger(ClientSender.class);
- private final boolean sendsSettled;
private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
private final SenderOptions options;
- private org.apache.qpid.protonj2.engine.Sender protonSender;
-
ClientSender(ClientSession session, SenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
- super(session, senderId, options);
+ super(session, senderId, options, protonSender);
this.options = new SenderOptions(options);
- this.protonSender = protonSender.setLinkedResource(this);
- this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
}
@Override
@@ -90,6 +85,7 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
return this.options;
}
+ @Override
void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException {
checkClosedOrFailed();
executor.execute(() -> {
@@ -97,14 +93,6 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
});
}
- org.apache.qpid.protonj2.engine.Sender getProtonSender() {
- return protonSender;
- }
-
- boolean isSendingSettled() {
- return sendsSettled;
- }
-
//----- Handlers for proton receiver events
private void handleCreditStateUpdated(org.apache.qpid.protonj2.engine.Sender sender) {
@@ -141,11 +129,6 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
return this;
}
- @Override
- protected org.apache.qpid.protonj2.engine.Sender protonLink() {
- return protonSender;
- }
-
private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
send.sendTimeout(executor.schedule(() -> {
@@ -352,7 +335,7 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
@Override
public void send(DeliveryState state, boolean settled) {
if (delivery == null) {
- delivery = sender.getProtonSender().next();
+ delivery = sender.protonLink().next();
delivery.setLinkedResource(sender.createTracker(delivery));
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
index afc1ef51..c1edd783 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
@@ -19,9 +19,13 @@ package org.apache.qpid.protonj2.client.impl;
import org.apache.qpid.protonj2.client.Link;
import org.apache.qpid.protonj2.client.LinkOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
import org.apache.qpid.protonj2.engine.LinkState;
+import org.apache.qpid.protonj2.engine.OutgoingDelivery;
import org.apache.qpid.protonj2.engine.Sender;
+import org.apache.qpid.protonj2.types.transport.DeliveryState;
+import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
/**
* Base type for all the proton client sender types which provides a few extra
@@ -29,17 +33,50 @@ import org.apache.qpid.protonj2.engine.Sender;
*/
public abstract class ClientSenderLinkType<LinkType extends Link<LinkType>> extends ClientLinkType<LinkType, Sender> {
- protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options) {
+ private final boolean sendsSettled;
+
+ protected Sender protonSender;
+
+ protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options, Sender protonSender) {
super(session, linkId, options);
+
+ this.protonSender = protonSender;
+ this.protonSender = protonSender.setLinkedResource(self());
+ this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
}
final boolean isAnonymous() {
- return protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null;
+ return protonSender.<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null;
+ }
+
+ final boolean isSendingSettled() {
+ return sendsSettled;
+ }
+
+ @Override
+ final protected Sender protonLink() {
+ return protonSender;
}
final void handleAnonymousRelayNotSupported() {
- if (isAnonymous() && protonLink().getState() == LinkState.IDLE) {
+ if (isAnonymous() && protonSender.getState() == LinkState.IDLE) {
immediateLinkShutdown(new ClientUnsupportedOperationException("Anonymous relay support not available from this connection"));
}
}
+
+ /**
+ * Provides a common API point for sender link types to allow resources to process and then
+ * apply dispositions to outgoing delivery types.
+ *
+ * @param delivery
+ * The outgoing delivery to apply the disposition to
+ * @param state
+ * The delivery state to apply to the outgoing delivery.
+ * @param settled
+ * Should the outgoing delivery be settled as part of the disposition.
+ *
+ * @throws ClientException if an error occurs while applying the dispostion.
+ */
+ abstract void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException;
+
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index 8b249f9c..9ab35c81 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -33,7 +33,6 @@ import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutExcepti
import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
-import org.apache.qpid.protonj2.engine.Receiver;
import org.apache.qpid.protonj2.types.messaging.Released;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
import org.slf4j.Logger;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
/**
* Client implementation of a {@link StreamReceiver}.
*/
-public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, org.apache.qpid.protonj2.engine.Receiver> implements StreamReceiver {
+public final class ClientStreamReceiver extends ClientReceiverLinkType<StreamReceiver> implements StreamReceiver {
private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
@@ -51,13 +50,10 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o
private final StreamReceiverOptions options;
private final Map<ClientFuture<StreamDelivery>, ScheduledFuture<?>> receiveRequests = new LinkedHashMap<>();
- private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
-
ClientStreamReceiver(ClientSession session, StreamReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
- super(session, receiverId, options);
+ super(session, receiverId, options, receiver);
this.options = options;
- this.protonReceiver = receiver.setLinkedResource(this);
if (options.creditWindow() > 0) {
protonReceiver.addCredit(options.creditWindow());
@@ -259,6 +255,7 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o
//----- Private implementation details
+ @Override
void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
checkClosedOrFailed();
asyncApplyDisposition(delivery, state, settle);
@@ -341,11 +338,6 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o
return this;
}
- @Override
- protected Receiver protonLink() {
- return protonReceiver;
- }
-
@Override
protected void linkSpecificLocalOpenHandler() {
protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 2513098e..511f9028 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -36,7 +36,6 @@ import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.client.futures.ClientSynchronization;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
-import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.util.StringUtils;
import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
import org.apache.qpid.protonj2.types.transport.DeliveryState;
@@ -47,22 +46,17 @@ import org.slf4j.LoggerFactory;
/**
* Client implementation of a {@link StreamSender}.
*/
-public final class ClientStreamSender extends ClientLinkType<StreamSender, Sender> implements StreamSender {
+public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> implements StreamSender {
private static final Logger LOG = LoggerFactory.getLogger(ClientStreamSender.class);
private final StreamSenderOptions options;
- private final boolean sendsSettled;
private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
- private org.apache.qpid.protonj2.engine.Sender protonSender;
-
ClientStreamSender(ClientSession session, StreamSenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
- super(session, senderId, options);
+ super(session, senderId, options, protonSender);
this.options = new StreamSenderOptions(options);
- this.protonSender = protonSender.setLinkedResource(this);
- this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
}
@Override
@@ -127,10 +121,6 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende
//----- Internal API
- boolean isSendingSettled() {
- return sendsSettled;
- }
-
StreamSenderOptions options() {
return this.options;
}
@@ -140,15 +130,6 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende
return this;
}
- Sender getProtonSender() {
- return protonSender;
- }
-
- @Override
- protected Sender protonLink() {
- return protonSender;
- }
-
private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
send.sendTimeout(executor.schedule(() -> {
@@ -227,6 +208,7 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende
return new ClientNoOpStreamTracker(this);
}
+ @Override
void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException {
checkClosedOrFailed();
executor.execute(() -> {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
index 0d5c3736..f6b17fbc 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
@@ -16,42 +16,18 @@
*/
package org.apache.qpid.protonj2.client.impl;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.StreamSender;
import org.apache.qpid.protonj2.client.StreamTracker;
-import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
-import org.apache.qpid.protonj2.client.exceptions.ClientException;
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
/**
* {@link StreamTracker} implementation that relies on the ClientTracker to handle the
* basic {@link OutgoingDelivery} management.
*/
-public final class ClientStreamTracker implements StreamTracker {
-
- private final ClientStreamSender sender;
- private final OutgoingDelivery delivery;
-
- private final ClientFuture<StreamTracker> remoteSettlementFuture;
-
- private volatile boolean remotelySettled;
- private volatile DeliveryState remoteDeliveryState;
+public final class ClientStreamTracker extends ClientTrackable<ClientStreamSender, StreamTracker> implements StreamTracker {
ClientStreamTracker(ClientStreamSender sender, OutgoingDelivery delivery) {
- this.sender = sender;
- this.delivery = delivery;
- this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
- this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
- }
-
- OutgoingDelivery delivery() {
- return delivery;
+ super(sender, delivery);
}
@Override
@@ -60,148 +36,7 @@ public final class ClientStreamTracker implements StreamTracker {
}
@Override
- public synchronized DeliveryState state() {
- return ClientDeliveryState.fromProtonType(delivery.getState());
- }
-
- @Override
- public DeliveryState remoteState() {
- return remoteDeliveryState;
- }
-
- @Override
- public boolean remoteSettled() {
- return remotelySettled;
- }
-
- @Override
- public StreamTracker disposition(DeliveryState state, boolean settle) throws ClientException {
- try {
- sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
- } finally {
- if (settle) {
- remoteSettlementFuture.complete(this);
- }
- }
-
+ protected StreamTracker self() {
return this;
}
-
- @Override
- public StreamTracker settle() throws ClientException {
- try {
- sender.disposition(delivery, null, true);
- } finally {
- remoteSettlementFuture.complete(this);
- }
-
- return this;
- }
-
- @Override
- public synchronized boolean settled() {
- return delivery.isSettled();
- }
-
- @Override
- public ClientFuture<StreamTracker> settlementFuture() {
- if (delivery.isSettled()) {
- remoteSettlementFuture.complete(this);
- }
-
- return remoteSettlementFuture;
- }
-
- @Override
- public StreamTracker awaitSettlement() throws ClientException {
- try {
- if (settled()) {
- return this;
- } else {
- return settlementFuture().get();
- }
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new ClientException("Wait for settlement was interrupted", e);
- }
- }
-
- @Override
- public StreamTracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
- try {
- if (settled()) {
- return this;
- } else {
- return settlementFuture().get(timeout, unit);
- }
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new ClientException("Wait for settlement was interrupted", ie);
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (TimeoutException te) {
- throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te);
- }
- }
-
- @Override
- public StreamTracker awaitAccepted() throws ClientException {
- try {
- if (settled() && !remoteSettled()) {
- return this;
- } else {
- settlementFuture().get();
- if (remoteState() != null && remoteState().isAccepted()) {
- return this;
- } else {
- throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
- }
- }
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new ClientException("Wait for Accepted outcome was interrupted", ie);
- }
- }
-
- @Override
- public StreamTracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
- try {
- if (settled() && !remoteSettled()) {
- return this;
- } else {
- settlementFuture().get(timeout, unit);
- if (remoteState() != null && remoteState().isAccepted()) {
- return this;
- } else {
- throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
- }
- }
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new ClientException("Wait for Accepted outcome was interrupted", ie);
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (TimeoutException te) {
- throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te);
- }
- }
-
- //----- Internal Event hooks for delivery updates
-
- private void processDeliveryUpdated(OutgoingDelivery delivery) {
- remotelySettled = delivery.isRemotelySettled();
- remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-
- if (delivery.isRemotelySettled()) {
- remoteSettlementFuture.complete(this);
- }
-
- if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
- delivery.settle();
- }
- }
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
similarity index 78%
copy from protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
copy to protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
index 862bf6a4..0fb08837 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.qpid.protonj2.client.impl;
import java.util.concurrent.ExecutionException;
@@ -21,8 +22,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.qpid.protonj2.client.DeliveryState;
-import org.apache.qpid.protonj2.client.Sender;
-import org.apache.qpid.protonj2.client.Tracker;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
@@ -30,14 +29,14 @@ import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
/**
- * Client outgoing delivery tracker object.
+ * Base type used to provide some common plumbing for Tracker types
*/
-class ClientTracker implements Tracker {
+public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>, TrackerType> {
- private final ClientSender sender;
- private final OutgoingDelivery delivery;
+ protected final SenderType sender;
+ protected final OutgoingDelivery delivery;
- private final ClientFuture<Tracker> remoteSettlementFuture;
+ private final ClientFuture<TrackerType> remoteSettlementFuture;
private volatile boolean remotelySettled;
private volatile DeliveryState remoteDeliveryState;
@@ -50,80 +49,69 @@ class ClientTracker implements Tracker {
* @param delivery
* The proton outgoing delivery object that backs this tracker.
*/
- ClientTracker(ClientSender sender, OutgoingDelivery delivery) {
+ ClientTrackable(SenderType sender, OutgoingDelivery delivery) {
this.sender = sender;
this.delivery = delivery;
this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
}
+ protected abstract TrackerType self();
+
OutgoingDelivery delivery() {
return delivery;
}
- @Override
- public Sender sender() {
- return sender;
- }
-
- @Override
public synchronized DeliveryState state() {
return ClientDeliveryState.fromProtonType(delivery.getState());
}
- @Override
public DeliveryState remoteState() {
return remoteDeliveryState;
}
- @Override
public boolean remoteSettled() {
return remotelySettled;
}
- @Override
- public Tracker disposition(DeliveryState state, boolean settle) throws ClientException {
+ public TrackerType disposition(DeliveryState state, boolean settle) throws ClientException {
try {
sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
} finally {
if (settle) {
- remoteSettlementFuture.complete(this);
+ remoteSettlementFuture.complete(self());
}
}
- return this;
+ return self();
}
- @Override
- public Tracker settle() throws ClientException {
+ public TrackerType settle() throws ClientException {
try {
sender.disposition(delivery, null, true);
} finally {
- remoteSettlementFuture.complete(this);
+ remoteSettlementFuture.complete(self());
}
- return this;
+ return self();
}
- @Override
public synchronized boolean settled() {
return delivery.isSettled();
}
- @Override
- public ClientFuture<Tracker> settlementFuture() {
+ public ClientFuture<TrackerType> settlementFuture() {
if (delivery.isSettled()) {
- remoteSettlementFuture.complete(this);
+ remoteSettlementFuture.complete(self());
}
return remoteSettlementFuture;
}
- @Override
- public Tracker awaitSettlement() throws ClientException {
+ public TrackerType awaitSettlement() throws ClientException {
try {
if (settled()) {
- return this;
+ return self();
} else {
return settlementFuture().get();
}
@@ -135,11 +123,10 @@ class ClientTracker implements Tracker {
}
}
- @Override
- public Tracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
+ public TrackerType awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
try {
if (settled()) {
- return this;
+ return self();
} else {
return settlementFuture().get(timeout, unit);
}
@@ -153,15 +140,14 @@ class ClientTracker implements Tracker {
}
}
- @Override
- public Tracker awaitAccepted() throws ClientException {
+ public TrackerType awaitAccepted() throws ClientException {
try {
if (settled() && !remoteSettled()) {
- return this;
+ return self();
} else {
settlementFuture().get();
if (remoteState() != null && remoteState().isAccepted()) {
- return this;
+ return self();
} else {
throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
}
@@ -174,15 +160,14 @@ class ClientTracker implements Tracker {
}
}
- @Override
- public Tracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
+ public TrackerType awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
try {
if (settled() && !remoteSettled()) {
- return this;
+ return self();
} else {
settlementFuture().get(timeout, unit);
if (remoteState() != null && remoteState().isAccepted()) {
- return this;
+ return self();
} else {
throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
}
@@ -204,10 +189,10 @@ class ClientTracker implements Tracker {
remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
if (delivery.isRemotelySettled()) {
- remoteSettlementFuture.complete(this);
+ remoteSettlementFuture.complete(self());
}
- if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
+ if (sender.options.autoSettle() && delivery.isRemotelySettled()) {
delivery.settle();
}
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
index 862bf6a4..23b9911f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
@@ -16,31 +16,13 @@
*/
package org.apache.qpid.protonj2.client.impl;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.qpid.protonj2.client.DeliveryState;
-import org.apache.qpid.protonj2.client.Sender;
import org.apache.qpid.protonj2.client.Tracker;
-import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
-import org.apache.qpid.protonj2.client.exceptions.ClientException;
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.futures.ClientFuture;
import org.apache.qpid.protonj2.engine.OutgoingDelivery;
/**
* Client outgoing delivery tracker object.
*/
-class ClientTracker implements Tracker {
-
- private final ClientSender sender;
- private final OutgoingDelivery delivery;
-
- private final ClientFuture<Tracker> remoteSettlementFuture;
-
- private volatile boolean remotelySettled;
- private volatile DeliveryState remoteDeliveryState;
+public final class ClientTracker extends ClientTrackable<ClientSender, Tracker> implements Tracker {
/**
* Create an instance of a client outgoing delivery tracker.
@@ -51,164 +33,16 @@ class ClientTracker implements Tracker {
* The proton outgoing delivery object that backs this tracker.
*/
ClientTracker(ClientSender sender, OutgoingDelivery delivery) {
- this.sender = sender;
- this.delivery = delivery;
- this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
- this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
- }
-
- OutgoingDelivery delivery() {
- return delivery;
+ super(sender, delivery);
}
@Override
- public Sender sender() {
+ public ClientSender sender() {
return sender;
}
@Override
- public synchronized DeliveryState state() {
- return ClientDeliveryState.fromProtonType(delivery.getState());
- }
-
- @Override
- public DeliveryState remoteState() {
- return remoteDeliveryState;
- }
-
- @Override
- public boolean remoteSettled() {
- return remotelySettled;
- }
-
- @Override
- public Tracker disposition(DeliveryState state, boolean settle) throws ClientException {
- try {
- sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
- } finally {
- if (settle) {
- remoteSettlementFuture.complete(this);
- }
- }
-
+ protected Tracker self() {
return this;
}
-
- @Override
- public Tracker settle() throws ClientException {
- try {
- sender.disposition(delivery, null, true);
- } finally {
- remoteSettlementFuture.complete(this);
- }
-
- return this;
- }
-
- @Override
- public synchronized boolean settled() {
- return delivery.isSettled();
- }
-
- @Override
- public ClientFuture<Tracker> settlementFuture() {
- if (delivery.isSettled()) {
- remoteSettlementFuture.complete(this);
- }
-
- return remoteSettlementFuture;
- }
-
- @Override
- public Tracker awaitSettlement() throws ClientException {
- try {
- if (settled()) {
- return this;
- } else {
- return settlementFuture().get();
- }
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new ClientException("Wait for settlement was interrupted", e);
- }
- }
-
- @Override
- public Tracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
- try {
- if (settled()) {
- return this;
- } else {
- return settlementFuture().get(timeout, unit);
- }
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new ClientException("Wait for settlement was interrupted", ie);
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (TimeoutException te) {
- throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te);
- }
- }
-
- @Override
- public Tracker awaitAccepted() throws ClientException {
- try {
- if (settled() && !remoteSettled()) {
- return this;
- } else {
- settlementFuture().get();
- if (remoteState() != null && remoteState().isAccepted()) {
- return this;
- } else {
- throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
- }
- }
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new ClientException("Wait for Accepted outcome was interrupted", ie);
- }
- }
-
- @Override
- public Tracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
- try {
- if (settled() && !remoteSettled()) {
- return this;
- } else {
- settlementFuture().get(timeout, unit);
- if (remoteState() != null && remoteState().isAccepted()) {
- return this;
- } else {
- throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
- }
- }
- } catch (InterruptedException ie) {
- Thread.interrupted();
- throw new ClientException("Wait for Accepted outcome was interrupted", ie);
- } catch (ExecutionException exe) {
- throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
- } catch (TimeoutException te) {
- throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te);
- }
- }
-
- //----- Internal Event hooks for delivery updates
-
- private void processDeliveryUpdated(OutgoingDelivery delivery) {
- remotelySettled = delivery.isRemotelySettled();
- remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-
- if (delivery.isRemotelySettled()) {
- remoteSettlementFuture.complete(this);
- }
-
- if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
- delivery.settle();
- }
- }
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
index 0ca1bf38..6f58d242 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
@@ -28,7 +28,7 @@ import org.apache.qpid.protonj2.types.transport.DeliveryState;
* to mask from the senders and receivers the work of deciding transaction
* specific behaviors.
*/
-public interface ClientTransactionContext {
+interface ClientTransactionContext {
public interface Sendable {
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 338cc178..5d0b7a84 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -245,7 +245,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
senderOptions.targetOptions().capabilities("queue");
ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);
- assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.getProtonSender().getSession().getOutgoingCapacity());
+ assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());
sender.openFuture().get();
sender.close();
@@ -282,7 +282,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
senderOptions.targetOptions().capabilities("queue");
ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);
- assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.getProtonSender().getSession().getOutgoingCapacity());
+ assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());
sender.openFuture().get();
sender.close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org