You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/11 21:42:39 UTC

[qpid-protonj2] branch main updated: PROTON-2541 Further refine the internal client implementation

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d3bbaf5 PROTON-2541 Further refine the internal client implementation
2d3bbaf5 is described below

commit 2d3bbaf5ad300afc0b3c6f25d9ca0dfcdc905578
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 11 17:42:29 2022 -0400

    PROTON-2541 Further refine the internal client implementation
    
    Provide some additional abstractions internally to reduce duplicated
    code amongst tracker types and sender and receiver link implementations.
---
 .../protonj2/client/impl/ClientConnection.java     |   2 +-
 .../client/impl/ClientConnectionCapabilities.java  |   2 +-
 .../client/impl/ClientExceptionSupport.java        |   2 +-
 .../client/impl/ClientNoOpStreamTracker.java       |   2 +-
 .../protonj2/client/impl/ClientNoOpTracker.java    |   2 +-
 .../qpid/protonj2/client/impl/ClientReceiver.java  |  16 +-
 .../client/impl/ClientReceiverLinkType.java        |  60 +++++++
 .../qpid/protonj2/client/impl/ClientSender.java    |  25 +--
 .../protonj2/client/impl/ClientSenderLinkType.java |  43 ++++-
 .../protonj2/client/impl/ClientStreamReceiver.java |  14 +-
 .../protonj2/client/impl/ClientStreamSender.java   |  24 +--
 .../protonj2/client/impl/ClientStreamTracker.java  | 171 +-------------------
 .../{ClientTracker.java => ClientTrackable.java}   |  73 ++++-----
 .../qpid/protonj2/client/impl/ClientTracker.java   | 174 +--------------------
 .../client/impl/ClientTransactionContext.java      |   2 +-
 .../protonj2/client/impl/StreamSenderTest.java     |   4 +-
 16 files changed, 157 insertions(+), 459 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index 9715ecf6..c6765dfc 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A {@link Connection} implementation that uses the Proton engine for AMQP protocol support.
  */
-public class ClientConnection implements Connection {
+public final class ClientConnection implements Connection {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientConnection.class);
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java
index 3570edbd..df35d023 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnectionCapabilities.java
@@ -27,7 +27,7 @@ import org.apache.qpid.protonj2.types.Symbol;
  * Tracks available known capabilities for the connection to allow the client
  * to know what features are supported on the current connection.
  */
-public class ClientConnectionCapabilities {
+public final class ClientConnectionCapabilities {
 
     private boolean anonymousRelaySupported;
     private boolean delayedDeliverySupported;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java
index 60756c11..94c76cd1 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientExceptionSupport.java
@@ -42,7 +42,7 @@ import org.apache.qpid.protonj2.types.transport.ConnectionError;
 import org.apache.qpid.protonj2.types.transport.ErrorCondition;
 import org.apache.qpid.protonj2.types.transport.LinkError;
 
-class ClientExceptionSupport {
+final class ClientExceptionSupport {
 
     /**
      * Checks the given cause to determine if it's already an ClientIOException type and
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
index 493f475f..c7417c36 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
@@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.futures.ClientFutureFactory;
  * A dummy Tracker instance that always indicates remote settlement and
  * acceptance for {@link StreamSender} instances.
  */
-public class ClientNoOpStreamTracker implements StreamTracker {
+public final class ClientNoOpStreamTracker implements StreamTracker {
 
     private final ClientStreamSender sender;
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java
index 0d942ed1..b4d5c2e0 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java
@@ -29,7 +29,7 @@ import org.apache.qpid.protonj2.client.futures.ClientFutureFactory;
  * A dummy Tracker instance that always indicates remote settlement and
  * acceptance.
  */
-public class ClientNoOpTracker implements Tracker {
+public final class ClientNoOpTracker implements Tracker {
 
     private final ClientSender sender;
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index b13e1fd3..e025d35b 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -17,7 +17,6 @@
 package org.apache.qpid.protonj2.client.impl;
 
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -40,7 +39,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Client {@link Receiver} implementation.
  */
-public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qpid.protonj2.engine.Receiver> implements Receiver {
+public final class ClientReceiver extends ClientReceiverLinkType<Receiver> implements Receiver {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
 
@@ -48,17 +47,12 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp
     private ScheduledFuture<?> drainingTimeout;
 
     private final ReceiverOptions options;
-    private final ScheduledExecutorService executor;
     private final FifoDeliveryQueue messageQueue;
 
-    private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
-
     ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
-        super(session, receiverId, options);
+        super(session, receiverId, options, receiver);
 
         this.options = options;
-        this.executor = session.getScheduler();
-        this.protonReceiver = receiver.setLinkedResource(this);
 
         if (options.creditWindow() > 0) {
             protonReceiver.addCredit(options.creditWindow());
@@ -177,6 +171,7 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp
 
     //----- Internal API for the ClientReceiver and other Client objects
 
+    @Override
     void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
         checkClosedOrFailed();
         asyncApplyDisposition(delivery, state, settle);
@@ -192,11 +187,6 @@ public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qp
         return this;
     }
 
-    @Override
-    protected org.apache.qpid.protonj2.engine.Receiver protonLink() {
-        return protonReceiver;
-    }
-
     //----- Handlers for proton receiver events
 
     private void handleDeliveryReceived(IncomingDelivery delivery) {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
new file mode 100644
index 00000000..a2a00fa7
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverLinkType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import org.apache.qpid.protonj2.client.Link;
+import org.apache.qpid.protonj2.client.LinkOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+import org.apache.qpid.protonj2.engine.Receiver;
+import org.apache.qpid.protonj2.types.transport.DeliveryState;
+
+/**
+ * Base class for client link types that wrap a proton receiver to provide
+ * delivery dispatch in some manner.
+ */
+public abstract class ClientReceiverLinkType<ReceiverType extends Link<ReceiverType>> extends ClientLinkType<ReceiverType, Receiver> {
+
+    protected Receiver protonReceiver;
+
+    protected ClientReceiverLinkType(ClientSession session, String linkId, LinkOptions<?> options, Receiver protonReceiver) {
+        super(session, linkId, options);
+
+        this.protonReceiver = protonReceiver;
+    }
+
+    @Override
+    protected org.apache.qpid.protonj2.engine.Receiver protonLink() {
+        return protonReceiver;
+    }
+
+    /**
+     * Apply the given disposition and settlement state to the given incoming delivery instance.
+     *
+     * @param delivery
+     * 		The incoming delivery that will be acted upon
+     * @param state
+     * 		The delivery state to apply to the given incoming delivery
+     * @param settle
+     * 		The settlement state to apply to the given incoming delivery
+     *
+     * @throws ClientException if an error occurs while applying the disposition to the delivery.
+     */
+    abstract void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException;
+
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index f449d400..7ff4849b 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -42,22 +42,17 @@ import org.slf4j.LoggerFactory;
 /**
  * Proton based AMQP Sender
  */
-final class ClientSender extends ClientSenderLinkType<Sender> implements Sender {
+public final class ClientSender extends ClientSenderLinkType<Sender> implements Sender {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientSender.class);
 
-    private final boolean sendsSettled;
     private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
     private final SenderOptions options;
 
-    private org.apache.qpid.protonj2.engine.Sender protonSender;
-
     ClientSender(ClientSession session, SenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
-        super(session, senderId, options);
+        super(session, senderId, options, protonSender);
 
         this.options = new SenderOptions(options);
-        this.protonSender = protonSender.setLinkedResource(this);
-        this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
     @Override
@@ -90,6 +85,7 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
         return this.options;
     }
 
+    @Override
     void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException {
         checkClosedOrFailed();
         executor.execute(() -> {
@@ -97,14 +93,6 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
         });
     }
 
-    org.apache.qpid.protonj2.engine.Sender getProtonSender() {
-        return protonSender;
-    }
-
-    boolean isSendingSettled() {
-        return sendsSettled;
-    }
-
     //----- Handlers for proton receiver events
 
     private void handleCreditStateUpdated(org.apache.qpid.protonj2.engine.Sender sender) {
@@ -141,11 +129,6 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
         return this;
     }
 
-    @Override
-    protected org.apache.qpid.protonj2.engine.Sender protonLink() {
-        return protonSender;
-    }
-
     private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
@@ -352,7 +335,7 @@ final class ClientSender extends ClientSenderLinkType<Sender> implements Sender
         @Override
         public void send(DeliveryState state, boolean settled) {
             if (delivery == null) {
-                delivery = sender.getProtonSender().next();
+                delivery = sender.protonLink().next();
                 delivery.setLinkedResource(sender.createTracker(delivery));
             }
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
index afc1ef51..c1edd783 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
@@ -19,9 +19,13 @@ package org.apache.qpid.protonj2.client.impl;
 
 import org.apache.qpid.protonj2.client.Link;
 import org.apache.qpid.protonj2.client.LinkOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
 import org.apache.qpid.protonj2.engine.LinkState;
+import org.apache.qpid.protonj2.engine.OutgoingDelivery;
 import org.apache.qpid.protonj2.engine.Sender;
+import org.apache.qpid.protonj2.types.transport.DeliveryState;
+import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
 
 /**
  * Base type for all the proton client sender types which provides a few extra
@@ -29,17 +33,50 @@ import org.apache.qpid.protonj2.engine.Sender;
  */
 public abstract class ClientSenderLinkType<LinkType extends Link<LinkType>> extends ClientLinkType<LinkType, Sender> {
 
-    protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options) {
+    private final boolean sendsSettled;
+
+    protected Sender protonSender;
+
+    protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options, Sender protonSender) {
         super(session, linkId, options);
+
+        this.protonSender = protonSender;
+        this.protonSender = protonSender.setLinkedResource(self());
+        this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
     final boolean isAnonymous() {
-        return protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null;
+        return protonSender.<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null;
+    }
+
+    final boolean isSendingSettled() {
+        return sendsSettled;
+    }
+
+    @Override
+    final protected Sender protonLink() {
+        return protonSender;
     }
 
     final void handleAnonymousRelayNotSupported() {
-        if (isAnonymous() && protonLink().getState() == LinkState.IDLE) {
+        if (isAnonymous() && protonSender.getState() == LinkState.IDLE) {
             immediateLinkShutdown(new ClientUnsupportedOperationException("Anonymous relay support not available from this connection"));
         }
     }
+
+    /**
+     * Provides a common API point for sender link types to allow resources to process and then
+     * apply dispositions to outgoing delivery types.
+     *
+     * @param delivery
+     * 		The outgoing delivery to apply the disposition to
+     * @param state
+     * 		The delivery state to apply to the outgoing delivery.
+     * @param settled
+     * 		Should the outgoing delivery be settled as part of the disposition.
+     *
+     * @throws ClientException if an error occurs while applying the dispostion.
+     */
+    abstract void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException;
+
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index 8b249f9c..9ab35c81 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -33,7 +33,6 @@ import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutExcepti
 import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
-import org.apache.qpid.protonj2.engine.Receiver;
 import org.apache.qpid.protonj2.types.messaging.Released;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
 import org.slf4j.Logger;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Client implementation of a {@link StreamReceiver}.
  */
-public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, org.apache.qpid.protonj2.engine.Receiver> implements StreamReceiver {
+public final class ClientStreamReceiver extends ClientReceiverLinkType<StreamReceiver> implements StreamReceiver {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
 
@@ -51,13 +50,10 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o
     private final StreamReceiverOptions options;
     private final Map<ClientFuture<StreamDelivery>, ScheduledFuture<?>> receiveRequests = new LinkedHashMap<>();
 
-    private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
-
     ClientStreamReceiver(ClientSession session, StreamReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
-        super(session, receiverId, options);
+        super(session, receiverId, options, receiver);
 
         this.options = options;
-        this.protonReceiver = receiver.setLinkedResource(this);
 
         if (options.creditWindow() > 0) {
             protonReceiver.addCredit(options.creditWindow());
@@ -259,6 +255,7 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o
 
     //----- Private implementation details
 
+    @Override
     void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
         checkClosedOrFailed();
         asyncApplyDisposition(delivery, state, settle);
@@ -341,11 +338,6 @@ public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, o
         return this;
     }
 
-    @Override
-    protected Receiver protonLink() {
-        return protonReceiver;
-    }
-
     @Override
     protected void linkSpecificLocalOpenHandler() {
         protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 2513098e..511f9028 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -36,7 +36,6 @@ import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.client.futures.ClientSynchronization;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
-import org.apache.qpid.protonj2.engine.Sender;
 import org.apache.qpid.protonj2.engine.util.StringUtils;
 import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
@@ -47,22 +46,17 @@ import org.slf4j.LoggerFactory;
 /**
  * Client implementation of a {@link StreamSender}.
  */
-public final class ClientStreamSender extends ClientLinkType<StreamSender, Sender> implements StreamSender {
+public final class ClientStreamSender extends ClientSenderLinkType<StreamSender> implements StreamSender {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientStreamSender.class);
 
     private final StreamSenderOptions options;
-    private final boolean sendsSettled;
     private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
 
-    private org.apache.qpid.protonj2.engine.Sender protonSender;
-
     ClientStreamSender(ClientSession session, StreamSenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
-        super(session, senderId, options);
+        super(session, senderId, options, protonSender);
 
         this.options = new StreamSenderOptions(options);
-        this.protonSender = protonSender.setLinkedResource(this);
-        this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
     @Override
@@ -127,10 +121,6 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende
 
     //----- Internal API
 
-    boolean isSendingSettled() {
-        return sendsSettled;
-    }
-
     StreamSenderOptions options() {
         return this.options;
     }
@@ -140,15 +130,6 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende
         return this;
     }
 
-    Sender getProtonSender() {
-        return protonSender;
-    }
-
-    @Override
-    protected Sender protonLink() {
-        return protonSender;
-    }
-
     private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
@@ -227,6 +208,7 @@ public final class ClientStreamSender extends ClientLinkType<StreamSender, Sende
         return new ClientNoOpStreamTracker(this);
     }
 
+    @Override
     void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException {
         checkClosedOrFailed();
         executor.execute(() -> {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
index 0d5c3736..f6b17fbc 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
@@ -16,42 +16,18 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.qpid.protonj2.client.DeliveryState;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamTracker;
-import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
-import org.apache.qpid.protonj2.client.exceptions.ClientException;
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
 
 /**
  * {@link StreamTracker} implementation that relies on the ClientTracker to handle the
  * basic {@link OutgoingDelivery} management.
  */
-public final class ClientStreamTracker implements StreamTracker {
-
-    private final ClientStreamSender sender;
-    private final OutgoingDelivery delivery;
-
-    private final ClientFuture<StreamTracker> remoteSettlementFuture;
-
-    private volatile boolean remotelySettled;
-    private volatile DeliveryState remoteDeliveryState;
+public final class ClientStreamTracker extends ClientTrackable<ClientStreamSender, StreamTracker> implements StreamTracker {
 
     ClientStreamTracker(ClientStreamSender sender, OutgoingDelivery delivery) {
-        this.sender = sender;
-        this.delivery = delivery;
-        this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
-        this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
-    }
-
-    OutgoingDelivery delivery() {
-        return delivery;
+        super(sender, delivery);
     }
 
     @Override
@@ -60,148 +36,7 @@ public final class ClientStreamTracker implements StreamTracker {
     }
 
     @Override
-    public synchronized DeliveryState state() {
-        return ClientDeliveryState.fromProtonType(delivery.getState());
-    }
-
-    @Override
-    public DeliveryState remoteState() {
-        return remoteDeliveryState;
-    }
-
-    @Override
-    public boolean remoteSettled() {
-        return remotelySettled;
-    }
-
-    @Override
-    public StreamTracker disposition(DeliveryState state, boolean settle) throws ClientException {
-        try {
-            sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
-        } finally {
-            if (settle) {
-                remoteSettlementFuture.complete(this);
-            }
-        }
-
+    protected StreamTracker self() {
         return this;
     }
-
-    @Override
-    public StreamTracker settle() throws ClientException {
-        try {
-            sender.disposition(delivery, null, true);
-        } finally {
-            remoteSettlementFuture.complete(this);
-        }
-
-        return this;
-    }
-
-    @Override
-    public synchronized boolean settled() {
-        return delivery.isSettled();
-    }
-
-    @Override
-    public ClientFuture<StreamTracker> settlementFuture() {
-        if (delivery.isSettled()) {
-            remoteSettlementFuture.complete(this);
-        }
-
-        return remoteSettlementFuture;
-    }
-
-    @Override
-    public StreamTracker awaitSettlement() throws ClientException {
-        try {
-            if (settled()) {
-                return this;
-            } else {
-                return settlementFuture().get();
-            }
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (InterruptedException e) {
-            Thread.interrupted();
-            throw new ClientException("Wait for settlement was interrupted", e);
-        }
-    }
-
-    @Override
-    public StreamTracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
-        try {
-            if (settled()) {
-                return this;
-            } else {
-                return settlementFuture().get(timeout, unit);
-            }
-        } catch (InterruptedException ie) {
-            Thread.interrupted();
-            throw new ClientException("Wait for settlement was interrupted", ie);
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (TimeoutException te) {
-            throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te);
-        }
-    }
-
-    @Override
-    public StreamTracker awaitAccepted() throws ClientException {
-        try {
-            if (settled() && !remoteSettled()) {
-                return this;
-            } else {
-                settlementFuture().get();
-                if (remoteState() != null && remoteState().isAccepted()) {
-                    return this;
-                } else {
-                    throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
-                }
-            }
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (InterruptedException ie) {
-            Thread.interrupted();
-            throw new ClientException("Wait for Accepted outcome was interrupted", ie);
-        }
-    }
-
-    @Override
-    public StreamTracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
-        try {
-            if (settled() && !remoteSettled()) {
-                return this;
-            } else {
-                settlementFuture().get(timeout, unit);
-                if (remoteState() != null && remoteState().isAccepted()) {
-                    return this;
-                } else {
-                    throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
-                }
-            }
-        } catch (InterruptedException ie) {
-            Thread.interrupted();
-            throw new ClientException("Wait for Accepted outcome was interrupted", ie);
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (TimeoutException te) {
-            throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te);
-        }
-    }
-
-    //----- Internal Event hooks for delivery updates
-
-    private void processDeliveryUpdated(OutgoingDelivery delivery) {
-        remotelySettled = delivery.isRemotelySettled();
-        remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-
-        if (delivery.isRemotelySettled()) {
-            remoteSettlementFuture.complete(this);
-        }
-
-        if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
-            delivery.settle();
-        }
-    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
similarity index 78%
copy from protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
copy to protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
index 862bf6a4..0fb08837 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTrackable.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.qpid.protonj2.client.impl;
 
 import java.util.concurrent.ExecutionException;
@@ -21,8 +22,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.qpid.protonj2.client.DeliveryState;
-import org.apache.qpid.protonj2.client.Sender;
-import org.apache.qpid.protonj2.client.Tracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
@@ -30,14 +29,14 @@ import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
 
 /**
- * Client outgoing delivery tracker object.
+ * Base type used to provide some common plumbing for Tracker types
  */
-class ClientTracker implements Tracker {
+public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>, TrackerType> {
 
-    private final ClientSender sender;
-    private final OutgoingDelivery delivery;
+    protected final SenderType sender;
+    protected final OutgoingDelivery delivery;
 
-    private final ClientFuture<Tracker> remoteSettlementFuture;
+    private final ClientFuture<TrackerType> remoteSettlementFuture;
 
     private volatile boolean remotelySettled;
     private volatile DeliveryState remoteDeliveryState;
@@ -50,80 +49,69 @@ class ClientTracker implements Tracker {
      * @param delivery
      *      The proton outgoing delivery object that backs this tracker.
      */
-    ClientTracker(ClientSender sender, OutgoingDelivery delivery) {
+    ClientTrackable(SenderType sender, OutgoingDelivery delivery) {
         this.sender = sender;
         this.delivery = delivery;
         this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
         this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
     }
 
+    protected abstract TrackerType self();
+
     OutgoingDelivery delivery() {
         return delivery;
     }
 
-    @Override
-    public Sender sender() {
-        return sender;
-    }
-
-    @Override
     public synchronized DeliveryState state() {
         return ClientDeliveryState.fromProtonType(delivery.getState());
     }
 
-    @Override
     public DeliveryState remoteState() {
         return remoteDeliveryState;
     }
 
-    @Override
     public boolean remoteSettled() {
         return remotelySettled;
     }
 
-    @Override
-    public Tracker disposition(DeliveryState state, boolean settle) throws ClientException {
+    public TrackerType disposition(DeliveryState state, boolean settle) throws ClientException {
         try {
             sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
         } finally {
             if (settle) {
-                remoteSettlementFuture.complete(this);
+                remoteSettlementFuture.complete(self());
             }
         }
 
-        return this;
+        return self();
     }
 
-    @Override
-    public Tracker settle() throws ClientException {
+    public TrackerType settle() throws ClientException {
         try {
             sender.disposition(delivery, null, true);
         } finally {
-            remoteSettlementFuture.complete(this);
+            remoteSettlementFuture.complete(self());
         }
 
-        return this;
+        return self();
     }
 
-    @Override
     public synchronized boolean settled() {
         return delivery.isSettled();
     }
 
-    @Override
-    public ClientFuture<Tracker> settlementFuture() {
+    public ClientFuture<TrackerType> settlementFuture() {
         if (delivery.isSettled()) {
-            remoteSettlementFuture.complete(this);
+            remoteSettlementFuture.complete(self());
         }
 
         return remoteSettlementFuture;
     }
 
-    @Override
-    public Tracker awaitSettlement() throws ClientException {
+    public TrackerType awaitSettlement() throws ClientException {
         try {
             if (settled()) {
-                return this;
+                return self();
             } else {
                 return settlementFuture().get();
             }
@@ -135,11 +123,10 @@ class ClientTracker implements Tracker {
         }
     }
 
-    @Override
-    public Tracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
+    public TrackerType awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
         try {
             if (settled()) {
-                return this;
+                return self();
             } else {
                 return settlementFuture().get(timeout, unit);
             }
@@ -153,15 +140,14 @@ class ClientTracker implements Tracker {
         }
     }
 
-    @Override
-    public Tracker awaitAccepted() throws ClientException {
+    public TrackerType awaitAccepted() throws ClientException {
         try {
             if (settled() && !remoteSettled()) {
-                return this;
+                return self();
             } else {
                 settlementFuture().get();
                 if (remoteState() != null && remoteState().isAccepted()) {
-                    return this;
+                    return self();
                 } else {
                     throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
                 }
@@ -174,15 +160,14 @@ class ClientTracker implements Tracker {
         }
     }
 
-    @Override
-    public Tracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
+    public TrackerType awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
         try {
             if (settled() && !remoteSettled()) {
-                return this;
+                return self();
             } else {
                 settlementFuture().get(timeout, unit);
                 if (remoteState() != null && remoteState().isAccepted()) {
-                    return this;
+                    return self();
                 } else {
                     throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
                 }
@@ -204,10 +189,10 @@ class ClientTracker implements Tracker {
         remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
 
         if (delivery.isRemotelySettled()) {
-            remoteSettlementFuture.complete(this);
+            remoteSettlementFuture.complete(self());
         }
 
-        if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
+        if (sender.options.autoSettle() && delivery.isRemotelySettled()) {
             delivery.settle();
         }
     }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
index 862bf6a4..23b9911f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTracker.java
@@ -16,31 +16,13 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.qpid.protonj2.client.DeliveryState;
-import org.apache.qpid.protonj2.client.Sender;
 import org.apache.qpid.protonj2.client.Tracker;
-import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
-import org.apache.qpid.protonj2.client.exceptions.ClientException;
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
 
 /**
  * Client outgoing delivery tracker object.
  */
-class ClientTracker implements Tracker {
-
-    private final ClientSender sender;
-    private final OutgoingDelivery delivery;
-
-    private final ClientFuture<Tracker> remoteSettlementFuture;
-
-    private volatile boolean remotelySettled;
-    private volatile DeliveryState remoteDeliveryState;
+public final class ClientTracker extends ClientTrackable<ClientSender, Tracker> implements Tracker {
 
     /**
      * Create an instance of a client outgoing delivery tracker.
@@ -51,164 +33,16 @@ class ClientTracker implements Tracker {
      *      The proton outgoing delivery object that backs this tracker.
      */
     ClientTracker(ClientSender sender, OutgoingDelivery delivery) {
-        this.sender = sender;
-        this.delivery = delivery;
-        this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
-        this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
-    }
-
-    OutgoingDelivery delivery() {
-        return delivery;
+        super(sender, delivery);
     }
 
     @Override
-    public Sender sender() {
+    public ClientSender sender() {
         return sender;
     }
 
     @Override
-    public synchronized DeliveryState state() {
-        return ClientDeliveryState.fromProtonType(delivery.getState());
-    }
-
-    @Override
-    public DeliveryState remoteState() {
-        return remoteDeliveryState;
-    }
-
-    @Override
-    public boolean remoteSettled() {
-        return remotelySettled;
-    }
-
-    @Override
-    public Tracker disposition(DeliveryState state, boolean settle) throws ClientException {
-        try {
-            sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
-        } finally {
-            if (settle) {
-                remoteSettlementFuture.complete(this);
-            }
-        }
-
+    protected Tracker self() {
         return this;
     }
-
-    @Override
-    public Tracker settle() throws ClientException {
-        try {
-            sender.disposition(delivery, null, true);
-        } finally {
-            remoteSettlementFuture.complete(this);
-        }
-
-        return this;
-    }
-
-    @Override
-    public synchronized boolean settled() {
-        return delivery.isSettled();
-    }
-
-    @Override
-    public ClientFuture<Tracker> settlementFuture() {
-        if (delivery.isSettled()) {
-            remoteSettlementFuture.complete(this);
-        }
-
-        return remoteSettlementFuture;
-    }
-
-    @Override
-    public Tracker awaitSettlement() throws ClientException {
-        try {
-            if (settled()) {
-                return this;
-            } else {
-                return settlementFuture().get();
-            }
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (InterruptedException e) {
-            Thread.interrupted();
-            throw new ClientException("Wait for settlement was interrupted", e);
-        }
-    }
-
-    @Override
-    public Tracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
-        try {
-            if (settled()) {
-                return this;
-            } else {
-                return settlementFuture().get(timeout, unit);
-            }
-        } catch (InterruptedException ie) {
-            Thread.interrupted();
-            throw new ClientException("Wait for settlement was interrupted", ie);
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (TimeoutException te) {
-            throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te);
-        }
-    }
-
-    @Override
-    public Tracker awaitAccepted() throws ClientException {
-        try {
-            if (settled() && !remoteSettled()) {
-                return this;
-            } else {
-                settlementFuture().get();
-                if (remoteState() != null && remoteState().isAccepted()) {
-                    return this;
-                } else {
-                    throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
-                }
-            }
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (InterruptedException ie) {
-            Thread.interrupted();
-            throw new ClientException("Wait for Accepted outcome was interrupted", ie);
-        }
-    }
-
-    @Override
-    public Tracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
-        try {
-            if (settled() && !remoteSettled()) {
-                return this;
-            } else {
-                settlementFuture().get(timeout, unit);
-                if (remoteState() != null && remoteState().isAccepted()) {
-                    return this;
-                } else {
-                    throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
-                }
-            }
-        } catch (InterruptedException ie) {
-            Thread.interrupted();
-            throw new ClientException("Wait for Accepted outcome was interrupted", ie);
-        } catch (ExecutionException exe) {
-            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
-        } catch (TimeoutException te) {
-            throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te);
-        }
-    }
-
-    //----- Internal Event hooks for delivery updates
-
-    private void processDeliveryUpdated(OutgoingDelivery delivery) {
-        remotelySettled = delivery.isRemotelySettled();
-        remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-
-        if (delivery.isRemotelySettled()) {
-            remoteSettlementFuture.complete(this);
-        }
-
-        if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
-            delivery.settle();
-        }
-    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
index 0ca1bf38..6f58d242 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
@@ -28,7 +28,7 @@ import org.apache.qpid.protonj2.types.transport.DeliveryState;
  * to mask from the senders and receivers the work of deciding transaction
  * specific behaviors.
  */
-public interface ClientTransactionContext {
+interface ClientTransactionContext {
 
     public interface Sendable {
 
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 338cc178..5d0b7a84 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -245,7 +245,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             senderOptions.targetOptions().capabilities("queue");
             ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);
 
-            assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.getProtonSender().getSession().getOutgoingCapacity());
+            assertEquals(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());
 
             sender.openFuture().get();
             sender.close();
@@ -282,7 +282,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             senderOptions.targetOptions().capabilities("queue");
             ClientStreamSender sender = (ClientStreamSender) connection.openStreamSender("test-queue", senderOptions);
 
-            assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.getProtonSender().getSession().getOutgoingCapacity());
+            assertEquals(PENDING_WRITES_BUFFER_SIZE, sender.protonLink().getSession().getOutgoingCapacity());
 
             sender.openFuture().get();
             sender.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org