You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/06 18:45:24 UTC

[qpid-protonj2] branch main updated (4080091d -> ce2f8294)

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


    from 4080091d PROTON-2539 Update Mockito to 4.5.1
     new 41464d3b PROTON-2541 Sender and Receiver link type API updates and fixes
     new ce2f8294 PROTON-2533 Update netty tc-natives to 2.0.52.Final

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |   2 +-
 .../protonj2/client/{Sender.java => Link.java}     | 173 ++----
 .../{SenderOptions.java => LinkOptions.java}       | 212 +++----
 .../org/apache/qpid/protonj2/client/Receiver.java  | 169 +----
 .../qpid/protonj2/client/ReceiverOptions.java      | 284 +--------
 .../org/apache/qpid/protonj2/client/Sender.java    | 174 +-----
 .../apache/qpid/protonj2/client/SenderOptions.java | 291 +--------
 .../qpid/protonj2/client/StreamDelivery.java       | 160 ++++-
 .../qpid/protonj2/client/StreamReceiver.java       |  47 +-
 .../protonj2/client/StreamReceiverOptions.java     | 142 +++--
 .../apache/qpid/protonj2/client/StreamSender.java  |  58 +-
 .../qpid/protonj2/client/StreamSenderOptions.java  | 104 ++--
 .../apache/qpid/protonj2/client/StreamTracker.java | 115 +++-
 .../qpid/protonj2/client/impl/ClientLinkType.java  | 508 +++++++++++++++
 .../client/impl/ClientLocalTransactionContext.java |  10 +-
 .../client/impl/ClientNoOpStreamTracker.java       |  56 +-
 .../client/impl/ClientNoOpTransactionContext.java  |   6 +-
 .../client/impl/ClientOutgoingEnvelope.java        | 230 -------
 .../qpid/protonj2/client/impl/ClientReceiver.java  | 437 ++-----------
 .../client/impl/ClientReceiverBuilder.java         |  11 +-
 .../qpid/protonj2/client/impl/ClientSender.java    | 683 ++++++---------------
 .../protonj2/client/impl/ClientSenderBuilder.java  |   9 +-
 .../protonj2/client/impl/ClientSenderLinkType.java |  45 ++
 .../qpid/protonj2/client/impl/ClientSession.java   |  16 +-
 .../protonj2/client/impl/ClientStreamReceiver.java | 469 ++------------
 .../protonj2/client/impl/ClientStreamSender.java   | 437 ++++++++++++-
 .../client/impl/ClientStreamSenderMessage.java     |   2 +-
 .../protonj2/client/impl/ClientStreamTracker.java  | 160 ++++-
 .../client/impl/ClientTransactionContext.java      |  29 +-
 .../qpid/protonj2/client/impl/SenderTest.java      |   7 +-
 .../protonj2/client/impl/StreamReceiverTest.java   |  11 +-
 .../protonj2/client/impl/StreamSenderTest.java     |  43 +-
 32 files changed, 2153 insertions(+), 2947 deletions(-)
 copy protonj2-client/src/main/java/org/apache/qpid/protonj2/client/{Sender.java => Link.java} (51%)
 copy protonj2-client/src/main/java/org/apache/qpid/protonj2/client/{SenderOptions.java => LinkOptions.java} (60%)
 create mode 100644 protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
 delete mode 100644 protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientOutgoingEnvelope.java
 create mode 100644 protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-protonj2] 01/02: PROTON-2541 Sender and Receiver link type API updates and fixes

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git

commit 41464d3b2ae60b37f8e76e8b901d21b625a4c246
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Fri May 6 14:43:27 2022 -0400

    PROTON-2541 Sender and Receiver link type API updates and fixes
    
    Better separate the sender and receiver APIs from the streams version
    which has several underlying behavioral differences and create some
    common link type base code to reduce code duplication in the various
    link implementations making it easier to add future link types such as
    asynchronous listeners etc.
---
 .../protonj2/client/{Sender.java => Link.java}     | 173 ++----
 .../{SenderOptions.java => LinkOptions.java}       | 212 +++----
 .../org/apache/qpid/protonj2/client/Receiver.java  | 169 +----
 .../qpid/protonj2/client/ReceiverOptions.java      | 284 +--------
 .../org/apache/qpid/protonj2/client/Sender.java    | 174 +-----
 .../apache/qpid/protonj2/client/SenderOptions.java | 291 +--------
 .../qpid/protonj2/client/StreamDelivery.java       | 160 ++++-
 .../qpid/protonj2/client/StreamReceiver.java       |  47 +-
 .../protonj2/client/StreamReceiverOptions.java     | 142 +++--
 .../apache/qpid/protonj2/client/StreamSender.java  |  58 +-
 .../qpid/protonj2/client/StreamSenderOptions.java  | 104 ++--
 .../apache/qpid/protonj2/client/StreamTracker.java | 115 +++-
 .../qpid/protonj2/client/impl/ClientLinkType.java  | 508 +++++++++++++++
 .../client/impl/ClientLocalTransactionContext.java |  10 +-
 .../client/impl/ClientNoOpStreamTracker.java       |  56 +-
 .../client/impl/ClientNoOpTransactionContext.java  |   6 +-
 .../client/impl/ClientOutgoingEnvelope.java        | 230 -------
 .../qpid/protonj2/client/impl/ClientReceiver.java  | 437 ++-----------
 .../client/impl/ClientReceiverBuilder.java         |  11 +-
 .../qpid/protonj2/client/impl/ClientSender.java    | 683 ++++++---------------
 .../protonj2/client/impl/ClientSenderBuilder.java  |   9 +-
 .../protonj2/client/impl/ClientSenderLinkType.java |  45 ++
 .../qpid/protonj2/client/impl/ClientSession.java   |  16 +-
 .../protonj2/client/impl/ClientStreamReceiver.java | 469 ++------------
 .../protonj2/client/impl/ClientStreamSender.java   | 437 ++++++++++++-
 .../client/impl/ClientStreamSenderMessage.java     |   2 +-
 .../protonj2/client/impl/ClientStreamTracker.java  | 160 ++++-
 .../client/impl/ClientTransactionContext.java      |  29 +-
 .../qpid/protonj2/client/impl/SenderTest.java      |   7 +-
 .../protonj2/client/impl/StreamReceiverTest.java   |  11 +-
 .../protonj2/client/impl/StreamSenderTest.java     |  43 +-
 31 files changed, 2152 insertions(+), 2946 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Link.java
similarity index 51%
copy from protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java
copy to protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Link.java
index 034743d5..addec4bb 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Link.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.qpid.protonj2.client;
 
 import java.util.Map;
@@ -22,24 +23,39 @@ import java.util.concurrent.Future;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 
 /**
- * AMQP Sender that provides an API for sending complete Message payload data.
+ * Base for all AMQP link types (Sender, Receiver etc).
  */
-public interface Sender extends AutoCloseable {
+public interface Link<T extends Link<T>> extends AutoCloseable {
 
     /**
-     * @return a {@link Future} that will be completed when the remote opens this {@link Sender}.
+     * @return a {@link Future} that will be completed when the remote opens this {@link Link}.
      */
-    Future<Sender> openFuture();
+    Future<T> openFuture();
 
     /**
-     * Requests a close of the {@link Sender} at the remote and waits until the Sender has been
-     * fully closed or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
+     * @return the {@link Client} instance that holds this session's {@link Link}
+     */
+    Client client();
+
+    /**
+     * @return the {@link Connection} instance that holds this session's {@link Link}
+     */
+    Connection connection();
+
+    /**
+     * @return the {@link Session} that created and holds this {@link Link}.
+     */
+    Session session();
+
+    /**
+     * Requests a close of the {@link Link} at the remote and waits until the Link has been
+     * fully closed or until the configured close timeout is exceeded.
      */
     @Override
     void close();
 
     /**
-     * Requests a close of the {@link Sender} at the remote and waits until the Sender has been
+     * Requests a close of the {@link Link} at the remote and waits until the Link has been
      * fully closed or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
      *
      * @param error
@@ -48,13 +64,13 @@ public interface Sender extends AutoCloseable {
     void close(ErrorCondition error);
 
     /**
-     * Requests a detach of the {@link Sender} at the remote and waits until the Sender has been
+     * Requests a detach of the {@link Link} at the remote and waits until the Link has been
      * fully detached or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
      */
     void detach();
 
     /**
-     * Requests a detach of the {@link Sender} at the remote and waits until the Sender has been
+     * Requests a detach of the {@link Link} at the remote and waits until the Link has been
      * fully detached or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
      *
      * @param error
@@ -63,78 +79,78 @@ public interface Sender extends AutoCloseable {
     void detach(ErrorCondition error);
 
     /**
-     * Requests a close of the {@link Sender} link at the remote and returns a {@link Future} that will be
+     * Requests a close of the {@link Link} link at the remote and returns a {@link Future} that will be
      * completed once the link has been closed.
      *
-     * @return a {@link Future} that will be completed when the remote closes this {@link Sender} link.
+     * @return a {@link Future} that will be completed when the remote closes this {@link Link} link.
      */
-    Future<Sender> closeAsync();
+    Future<T> closeAsync();
 
     /**
-     * Requests a close of the {@link Sender} link at the remote and returns a {@link Future} that will be
+     * Requests a close of the {@link Link} link at the remote and returns a {@link Future} that will be
      * completed once the link has been closed.
      *
      * @param error
      * 		The {@link ErrorCondition} to transmit to the remote along with the close operation.
      *
-     * @return a {@link Future} that will be completed when the remote closes this {@link Sender} link.
+     * @return a {@link Future} that will be completed when the remote closes this {@link Link} link.
      */
-    Future<Sender> closeAsync(ErrorCondition error);
+    Future<T> closeAsync(ErrorCondition error);
 
     /**
-     * Requests a detach of the {@link Sender} link at the remote and returns a {@link Future} that will be
+     * Requests a detach of the {@link Link} link at the remote and returns a {@link Future} that will be
      * completed once the link has been detached.
      *
-     * @return a {@link Future} that will be completed when the remote detaches this {@link Sender} link.
+     * @return a {@link Future} that will be completed when the remote detaches this {@link Link} link.
      */
-    Future<Sender> detachAsync();
+    Future<T> detachAsync();
 
     /**
-     * Requests a detach of the {@link Sender} link at the remote and returns a {@link Future} that will be
+     * Requests a detach of the {@link Link} link at the remote and returns a {@link Future} that will be
      * completed once the link has been detached.
      *
      * @param error
      * 		The {@link ErrorCondition} to transmit to the remote along with the detach operation.
      *
-     * @return a {@link Future} that will be completed when the remote detaches this {@link Sender} link.
+     * @return a {@link Future} that will be completed when the remote detaches this {@link Link} link.
      */
-    Future<Sender> detachAsync(ErrorCondition error);
+    Future<T> detachAsync(ErrorCondition error);
 
     /**
-     * Returns the address that the {@link Sender} instance will send {@link Message} objects
-     * to.  The value returned from this method is control by the configuration that was used
-     * to create the sender.
+     * Returns the address that the {@link Link} instance will be subscribed to. This method can
+     * block based on the type of link and how it was configured.
      *
      * <ul>
      *  <li>
-     *    If the Sender is configured as an anonymous sender then this method returns null.
+     *    If the link is a Sender and it was configured as an anonymous sender then this method
+     *    returns null as the link has no address.
      *  </li>
      *  <li>
-     *    If the Sender was created with the dynamic sender methods then the method will return
-     *    the dynamically created address once the remote has attached its end of the sender link.
-     *    Due to the need to await the remote peer to populate the dynamic address this method will
-     *    block until the open of the sender link has completed.
+     *   If a link was created with the dynamic node value enabled then the method will return
+     *   the dynamically created address once the remote has attached its end of the opened link.
+     *   Due to the need to await the remote peer to populate the dynamic address this method will
+     *   block until the open of the link has completed.
      *  </li>
      *  <li>
-     *    If neither of the above is true then the address returned is the address passed to the original
-     *    {@link Session#openSender(String)} or {@link Session#openSender(String, SenderOptions)} methods.
+     *   If not a dynamic link then the address returned is the address passed to the original
+     *   link creation method.
      *  </li>
      * </ul>
      *
-     * @return the address that this {@link Sender} is sending to.
+     * @return the address that this {@link Link} is was assigned to.
      *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} address.
+     * @throws ClientException if an error occurs while obtaining the {@link Link} address.
      */
     String address() throws ClientException;
 
     /**
-     * Returns an immutable view of the remote {@link Source} object assigned to this sender link.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
+     * Returns an immutable view of the remote {@link Source} object assigned to this link.  If the attach
+     * has not completed yet this method will block to await the attach response which carries the remote
      * {@link Source}.
      *
      * @return the remote {@link Source} node configuration.
      *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote {@link Source}.
+     * @throws ClientException if an error occurs while obtaining the {@link Link} remote {@link Source}.
      */
     Source source() throws ClientException;
 
@@ -145,112 +161,41 @@ public interface Sender extends AutoCloseable {
      *
      * @return the remote {@link Target} node configuration.
      *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote {@link Target}.
+     * @throws ClientException if an error occurs while obtaining the {@link Link} remote {@link Target}.
      */
     Target target() throws ClientException;
 
     /**
-     * Returns the properties that the remote provided upon successfully opening the {@link Sender}.  If the
+     * Returns the properties that the remote provided upon successfully opening the {@link Link}.  If the
      * attach has not completed yet this method will block to await the attach response which carries the remote
      * properties.  If the remote provides no properties this method will return null.
      *
      * @return any properties provided from the remote once the sender has successfully opened.
      *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote properties.
+     * @throws ClientException if an error occurs while obtaining the {@link Link} remote properties.
      */
     Map<String, Object> properties() throws ClientException;
 
     /**
-     * Returns the offered capabilities that the remote provided upon successfully opening the {@link Sender}.
+     * Returns the offered capabilities that the remote provided upon successfully opening the {@link Link}.
      * If the attach has not completed yet this method will block to await the attach response which carries the
      * remote offered capabilities.  If the remote provides no capabilities this method will return null.
      *
      * @return any capabilities provided from the remote once the sender has successfully opened.
      *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote offered capabilities.
+     * @throws ClientException if an error occurs while obtaining the {@link Link} remote offered capabilities.
      */
     String[] offeredCapabilities() throws ClientException;
 
     /**
-     * Returns the desired capabilities that the remote provided upon successfully opening the {@link Sender}.
+     * Returns the desired capabilities that the remote provided upon successfully opening the {@link Link}.
      * If the attach has not completed yet this method will block to await the attach response which carries the
      * remote desired capabilities.  If the remote provides no capabilities this method will return null.
      *
      * @return any desired capabilities provided from the remote once the sender has successfully opened.
      *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote desired capabilities.
+     * @throws ClientException if an error occurs while obtaining the {@link Link} remote desired capabilities.
      */
     String[] desiredCapabilities() throws ClientException;
 
-    /**
-     * @return the {@link Client} instance that holds this session's {@link Sender}
-     */
-    Client client();
-
-    /**
-     * @return the {@link Connection} instance that holds this session's {@link Sender}
-     */
-    Connection connection();
-
-    /**
-     * @return the {@link Session} that created and holds this {@link Sender}.
-     */
-    Session session();
-
-    /**
-     * Send the given message immediately if there is credit available or blocks if the link
-     * has not yet been granted credit.
-     *
-     * @param message
-     *      the {@link Message} to send.
-     *
-     * @return the {@link Tracker} for the message delivery
-     *
-     * @throws ClientException if an error occurs while initiating the send operation.
-     */
-    Tracker send(Message<?> message) throws ClientException;
-
-    /**
-     * Send the given message immediately if there is credit available or blocks if the link
-     * has not yet been granted credit.
-     *
-     * @param message
-     *      the {@link Message} to send.
-     * @param deliveryAnnotations
-     *      the delivery annotations that should be included in the sent {@link Message}.
-     *
-     * @return the {@link Tracker} for the message delivery
-     *
-     * @throws ClientException if an error occurs while initiating the send operation.
-     */
-    Tracker send(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException;
-
-    /**
-     * Send the given message if credit is available or returns null if no credit has been
-     * granted to the link at the time of the send attempt.
-     *
-     * @param message
-     *      the {@link Message} to send if credit is available.
-     *
-     * @return the {@link Tracker} for the message delivery or null if no credit for sending.
-     *
-     * @throws ClientException if an error occurs while initiating the send operation.
-     */
-    Tracker trySend(Message<?> message) throws ClientException;
-
-    /**
-     * Send the given message if credit is available or returns null if no credit has been
-     * granted to the link at the time of the send attempt.
-     *
-     * @param message
-     *      the {@link Message} to send if credit is available.
-     * @param deliveryAnnotations
-     *      the delivery annotations that should be included in the sent {@link Message}.
-     *
-     * @return the {@link Tracker} for the message delivery or null if no credit for sending.
-     *
-     * @throws ClientException if an error occurs while initiating the send operation.
-     */
-    Tracker trySend(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException;
-
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/LinkOptions.java
similarity index 60%
copy from protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
copy to protonj2-client/src/main/java/org/apache/qpid/protonj2/client/LinkOptions.java
index 7097d43b..d94e381f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/LinkOptions.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.qpid.protonj2.client;
 
 import java.util.Arrays;
@@ -22,21 +23,19 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 
 /**
- * Options that control the behavior of a {@link Sender} created from them.
+ * Base options that are applied to AMQP link types.
  */
-public class SenderOptions {
+public abstract class LinkOptions<T extends LinkOptions<T>> {
 
-    private long sendTimeout = ConnectionOptions.DEFAULT_SEND_TIMEOUT;
     private long requestTimeout = ConnectionOptions.DEFAULT_REQUEST_TIMEOUT;
     private long openTimeout = ConnectionOptions.DEFAULT_OPEN_TIMEOUT;
     private long closeTimeout = ConnectionOptions.DEFAULT_CLOSE_TIMEOUT;
 
-    private String linkName;
     private boolean autoSettle = true;
     private DeliveryMode deliveryMode = DeliveryMode.AT_LEAST_ONCE;
+    private String linkName;
 
     private final SourceOptions source = new SourceOptions();
     private final TargetOptions target = new TargetOptions();
@@ -46,94 +45,86 @@ public class SenderOptions {
     private Map<String, Object> properties;
 
     /**
-     * Create a new {@link SenderOptions} instance configured with default configuration settings.
+     * Create a new LinkOptions instance with defaults set for all options.
      */
-    public SenderOptions() {
+    public LinkOptions() {
     }
 
     /**
-     * Create a new SenderOptions instance that copies the configuration from the specified source options.
+     * Create a new LinkOptions instance that copies the configuration from the specified source options.
      *
      * @param options
-     * 		The SenderOptions instance whose settings are to be copied into this one.
+     * 		The LinkOptions instance whose settings are to be copied into this one.
      */
-    public SenderOptions(SenderOptions options) {
+    public LinkOptions(LinkOptions<T> options) {
         if (options != null) {
             options.copyInto(this);
         }
     }
 
     /**
-     * Configures the link name to use when creating a given {@link Sender} instance.
+     * Controls if the created Link will automatically settle the deliveries that have
+     * been received by the application (default is <code>true</code>).
      *
-     * @param linkName
-     *      The assigned link name to use when creating a {@link Sender}.
+     * @param autoSettle
+     *      The value to assign for auto delivery settlement.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions linkName(String linkName) {
-        this.linkName = linkName;
-        return this;
+    public T autoSettle(boolean autoSettle) {
+        this.autoSettle = autoSettle;
+        return self();
     }
 
     /**
-     * @return the configured link name to use when creating a {@link Sender}.
+     * @return the current value of the {@link Link} auto settlement setting.
      */
-    public String linkName() {
-        return linkName;
+    public boolean autoSettle() {
+        return autoSettle;
     }
 
     /**
-     * Sets whether sent deliveries should be automatically locally-settled once
-     * they have become remotely-settled by the receiving peer.
+     * Sets the {@link DeliveryMode} value to assign to newly created {@link Link} instances.
      *
-     * True by default.
-     *
-     * @param autoSettle
-     *            whether deliveries should be auto settled locally after being
-     *            settled by the receiver
+     * @param deliveryMode
+     *      The delivery mode value to configure.
      *
-     * @return the sender
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions autoSettle(boolean autoSettle) {
-        this.autoSettle = autoSettle;
-        return this;
+    public T deliveryMode(DeliveryMode deliveryMode) {
+        this.deliveryMode = deliveryMode;
+        return self();
     }
 
     /**
-     * Get whether the {@link Sender} is auto settling deliveries.
-     *
-     * @return whether deliveries should be auto settled locally after being settled
-     *         by the receiver
-     *
-     * @see #autoSettle(boolean)
+     * @return the current value of the {@link Link} delivery mode configuration.
      */
-    public boolean autoSettle() {
-        return autoSettle;
+    public DeliveryMode deliveryMode() {
+        return deliveryMode;
     }
 
     /**
-     * Sets the {@link DeliveryMode} value to assign to newly created {@link Sender} instances.
+     * Configures the link name to use when creating a given {@link Link} instance.
      *
-     * @param deliveryMode
-     *      The delivery mode value to configure.
+     * @param linkName
+     *      The assigned link name to use when creating a {@link Link}.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions deliveryMode(DeliveryMode deliveryMode) {
-        this.deliveryMode = deliveryMode;
-        return this;
+    public T linkName(String linkName) {
+        this.linkName = linkName;
+        return self();
     }
 
     /**
-     * @return the current value of the {@link Sender} delivery mode configuration.
+     * @return the configured link name to use when creating a {@link Link}.
      */
-    public DeliveryMode deliveryMode() {
-        return deliveryMode;
+    public String linkName() {
+        return linkName;
     }
 
     /**
-     * @return the timeout used when awaiting a response from the remote when a {@link Sender} is closed.
+     * @return the timeout used when awaiting a response from the remote when a {@link Link} is closed.
      */
     public long closeTimeout() {
         return closeTimeout;
@@ -141,35 +132,35 @@ public class SenderOptions {
 
     /**
      * Configures the timeout used when awaiting a response from the remote that a request to close
-     * the {@link Sender} link.
+     * the {@link Link} link.
      *
      * @param closeTimeout
      *      Timeout value in milliseconds to wait for a remote response.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link ReceiverOptions} instance.
      */
-    public SenderOptions closeTimeout(long closeTimeout) {
+    public T closeTimeout(long closeTimeout) {
         return closeTimeout(closeTimeout, TimeUnit.MILLISECONDS);
     }
 
     /**
      * Configures the timeout used when awaiting a response from the remote that a request to close
-     * the {@link Sender} link.
+     * the {@link Link} link.
      *
      * @param timeout
      *      Timeout value to wait for a remote response.
      * @param units
      * 		The {@link TimeUnit} that defines the timeout span.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions closeTimeout(long timeout, TimeUnit units) {
+    public T closeTimeout(long timeout, TimeUnit units) {
         this.closeTimeout = units.toMillis(timeout);
-        return this;
+        return self();
     }
 
     /**
-     * @return the timeout used when awaiting a response from the remote when a {@link Sender} is opened.
+     * @return the timeout used when awaiting a response from the remote when a {@link Link} is opened.
      */
     public long openTimeout() {
         return openTimeout;
@@ -177,71 +168,31 @@ public class SenderOptions {
 
     /**
      * Configures the timeout used when awaiting a response from the remote that a request to open
-     * a {@link Sender} has been honored.
+     * a {@link Link} has been honored.
      *
      * @param openTimeout
      *      Timeout value in milliseconds to wait for a remote response.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions openTimeout(long openTimeout) {
+    public T openTimeout(long openTimeout) {
         return openTimeout(openTimeout, TimeUnit.MILLISECONDS);
     }
 
     /**
      * Configures the timeout used when awaiting a response from the remote that a request to open
-     * a {@link Sender} has been honored.
+     * a {@link Link} has been honored.
      *
      * @param timeout
      *      Timeout value to wait for a remote response.
      * @param units
      * 		The {@link TimeUnit} that defines the timeout span.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions openTimeout(long timeout, TimeUnit units) {
+    public T openTimeout(long timeout, TimeUnit units) {
         this.openTimeout = units.toMillis(timeout);
-        return this;
-    }
-
-    /**
-     * @return the timeout used when awaiting a response from the remote when a resource is message send.
-     */
-    public long sendTimeout() {
-        return sendTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a send operation to complete.  A send will block if the
-     * remote has not granted the {@link Sender} or the {@link Session} credit to do so, if the send blocks
-     * for longer than this timeout the send call will fail with an {@link ClientSendTimedOutException}
-     * exception to indicate that the send did not complete.
-     *
-     * @param sendTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions sendTimeout(long sendTimeout) {
-        return sendTimeout(sendTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a send operation to complete.  A send will block if the
-     * remote has not granted the {@link Sender} or the {@link Session} credit to do so, if the send blocks
-     * for longer than this timeout the send call will fail with an {@link ClientSendTimedOutException}
-     * exception to indicate that the send did not complete.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions sendTimeout(long timeout, TimeUnit units) {
-        this.sendTimeout = units.toMillis(timeout);
-        return this;
+        return self();
     }
 
     /**
@@ -261,9 +212,9 @@ public class SenderOptions {
      * @param requestTimeout
      *      Timeout value in milliseconds to wait for a remote response.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions requestTimeout(long requestTimeout) {
+    public T requestTimeout(long requestTimeout) {
         return requestTimeout(requestTimeout, TimeUnit.MILLISECONDS);
     }
 
@@ -279,11 +230,11 @@ public class SenderOptions {
      * @param units
      * 		The {@link TimeUnit} that defines the timeout span.
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions requestTimeout(long timeout, TimeUnit units) {
+    public T requestTimeout(long timeout, TimeUnit units) {
         this.requestTimeout = units.toMillis(timeout);
-        return this;
+        return self();
     }
 
     /**
@@ -296,11 +247,11 @@ public class SenderOptions {
     /**
      * @param offeredCapabilities the offeredCapabilities to set
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions offeredCapabilities(String... offeredCapabilities) {
+    public T offeredCapabilities(String... offeredCapabilities) {
         this.offeredCapabilities = offeredCapabilities;
-        return this;
+        return self();
     }
 
     /**
@@ -313,11 +264,11 @@ public class SenderOptions {
     /**
      * @param desiredCapabilities the desiredCapabilities to set
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions desiredCapabilities(String... desiredCapabilities) {
+    public T desiredCapabilities(String... desiredCapabilities) {
         this.desiredCapabilities = desiredCapabilities;
-        return this;
+        return self();
     }
 
     /**
@@ -330,34 +281,29 @@ public class SenderOptions {
     /**
      * @param properties the properties to set
      *
-     * @return this {@link SenderOptions} instance.
+     * @return this {@link LinkOptions} instance.
      */
-    public SenderOptions properties(Map<String, Object> properties) {
+    public T properties(Map<String, Object> properties) {
         this.properties = properties;
-        return this;
+        return self();
     }
 
     /**
-     * @return the source
+     * @return the source options that will be used when creating new {@link Link} instances.
      */
     public SourceOptions sourceOptions() {
         return source;
     }
 
     /**
-     * @return the target
+     * @return the target options that will be used when creating new {@link Sender} instances.
      */
     public TargetOptions targetOptions() {
         return target;
     }
 
-    @Override
-    public SenderOptions clone() {
-        return copyInto(new SenderOptions());
-    }
-
     /**
-     * Copy all options from this {@link SenderOptions} instance into the instance
+     * Copy all options from this {@link LinkOptions} instance into the instance
      * provided.
      *
      * @param other
@@ -365,13 +311,13 @@ public class SenderOptions {
      *
      * @return this options class for chaining.
      */
-    protected SenderOptions copyInto(SenderOptions other) {
-        other.autoSettle(autoSettle);
+    protected LinkOptions<T> copyInto(LinkOptions<T> other) {
         other.linkName(linkName);
         other.closeTimeout(closeTimeout);
         other.openTimeout(openTimeout);
-        other.sendTimeout(sendTimeout);
         other.requestTimeout(requestTimeout);
+        other.deliveryMode(deliveryMode);
+        other.autoSettle(autoSettle);
 
         if (offeredCapabilities != null) {
             other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
@@ -388,4 +334,10 @@ public class SenderOptions {
 
         return this;
     }
+
+    /**
+     * @return the true derived type instance for use in this class.
+     */
+    protected abstract T self();
+
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
index 8644cf21..ba6cefd4 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Receiver.java
@@ -16,7 +16,6 @@
  */
 package org.apache.qpid.protonj2.client;
 
-import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -29,173 +28,7 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException;
  *
  * @see StreamReceiver
  */
-public interface Receiver extends AutoCloseable {
-
-    /**
-     * @return a {@link Future} that will be completed when the remote opens this {@link Receiver}.
-     */
-    Future<Receiver> openFuture();
-
-    /**
-     * Requests a close of the {@link Receiver} at the remote and waits until the Receiver has been
-     * fully closed or until the configured {@link ReceiverOptions#closeTimeout()} is exceeded.
-     */
-    @Override
-    void close();
-
-    /**
-     * Requests a close of the {@link Receiver} at the remote and waits until the Receiver has been
-     * fully closed or until the configured {@link ReceiverOptions#closeTimeout()} is exceeded.
-     *
-     * @param error
-     *      The {@link ErrorCondition} to transmit to the remote along with the close operation.
-     */
-    void close(ErrorCondition error);
-
-    /**
-     * Requests a detach of the {@link Receiver} at the remote and waits until the Receiver has been
-     * fully detached or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
-     */
-    void detach();
-
-    /**
-     * Requests a detach of the {@link Receiver} at the remote and waits until the Receiver has been
-     * fully detached or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
-     *
-     * @param error
-     *      The {@link ErrorCondition} to transmit to the remote along with the detach operation.
-     */
-    void detach(ErrorCondition error);
-
-    /**
-     * Requests a close of the {@link Receiver} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been closed.
-     *
-     * @return a {@link Future} that will be completed when the remote closes this {@link Receiver} link.
-     */
-    Future<Receiver> closeAsync();
-
-    /**
-     * Requests a close of the {@link Receiver} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been closed.
-     *
-     * @param error
-     * 		The {@link ErrorCondition} to transmit to the remote along with the close operation.
-     *
-     * @return a {@link Future} that will be completed when the remote closes this {@link Receiver} link.
-     */
-    Future<Receiver> closeAsync(ErrorCondition error);
-
-    /**
-     * Requests a detach of the {@link Receiver} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been detached.
-     *
-     * @return a {@link Future} that will be completed when the remote detaches this {@link Receiver} link.
-     */
-    Future<Receiver> detachAsync();
-
-    /**
-     * Requests a detach of the {@link Receiver} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been detached.
-     *
-     * @param error
-     * 		The {@link ErrorCondition} to transmit to the remote along with the detach operation.
-     *
-     * @return a {@link Future} that will be completed when the remote detaches this {@link Receiver} link.
-     */
-    Future<Receiver> detachAsync(ErrorCondition error);
-
-    /**
-     * Returns the address that the {@link Receiver} instance will be subscribed to.
-     *
-     * <ul>
-     *  <li>
-     *   If the Receiver was created with the dynamic receiver methods then the method will return
-     *   the dynamically created address once the remote has attached its end of the receiver link.
-     *   Due to the need to await the remote peer to populate the dynamic address this method will
-     *   block until the open of the receiver link has completed.
-     *  </li>
-     *  <li>
-     *   If not a dynamic receiver then the address returned is the address passed to the original
-     *   {@link Session#openReceiver(String)} or {@link Session#openReceiver(String, ReceiverOptions)} methods.
-     *  </li>
-     * </ul>
-     *
-     * @return the address that this {@link Receiver} is sending to.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Receiver} address.
-     */
-    String address() throws ClientException;
-
-    /**
-     * Returns an immutable view of the remote {@link Source} object assigned to this receiver link.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
-     * {@link Source}.
-     *
-     * @return the remote {@link Source} node configuration.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Receiver} remote {@link Source}.
-     */
-    Source source() throws ClientException;
-
-    /**
-     * Returns an immutable view of the remote {@link Target} object assigned to this receiver link.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
-     * {@link Source}.
-     *
-     * @return the remote {@link Target} node configuration.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Receiver} remote {@link Target}.
-     */
-    Target target() throws ClientException;
-
-    /**
-     * Returns the properties that the remote provided upon successfully opening the {@link Receiver}.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
-     * properties.  If the remote provides no properties this method will return null.
-     *
-     * @return any properties provided from the remote once the receiver has successfully opened.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Receiver} remote properties.
-     */
-    Map<String, Object> properties() throws ClientException;
-
-    /**
-     * Returns the offered capabilities that the remote provided upon successfully opening the {@link Receiver}.
-     * If the attach has not completed yet this method will block to await the attach response which carries the
-     * remote offered capabilities.  If the remote provides no capabilities this method will return null.
-     *
-     * @return any capabilities provided from the remote once the receiver has successfully opened.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Receiver} remote offered capabilities.
-     */
-    String[] offeredCapabilities() throws ClientException;
-
-    /**
-     * Returns the desired capabilities that the remote provided upon successfully opening the {@link Receiver}.
-     * If the attach has not completed yet this method will block to await the attach response which carries the
-     * remote desired capabilities.  If the remote provides no capabilities this method will return null.
-     *
-     * @return any desired capabilities provided from the remote once the receiver has successfully opened.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Receiver} remote desired capabilities.
-     */
-    String[] desiredCapabilities() throws ClientException;
-
-    /**
-     * @return the {@link Client} instance that holds this session's {@link Receiver}
-     */
-    Client client();
-
-    /**
-     * @return the {@link Connection} instance that holds this session's {@link Receiver}
-     */
-    Connection connection();
-
-    /**
-     * @return the {@link Session} that created and holds this {@link Receiver}.
-     */
-    Session session();
+public interface Receiver extends Link<Receiver> {
 
     /**
      * Adds credit to the {@link Receiver} link for use when there receiver has not been configured
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ReceiverOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ReceiverOptions.java
index f279173c..fa7f3879 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ReceiverOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/ReceiverOptions.java
@@ -16,35 +16,16 @@
  */
 package org.apache.qpid.protonj2.client;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
-
 /**
  * Options that control the behavior of the {@link Receiver} created from them.
  */
-public class ReceiverOptions {
+public class ReceiverOptions extends LinkOptions<ReceiverOptions> {
 
     private long drainTimeout = ConnectionOptions.DEFAULT_DRAIN_TIMEOUT;
-    private long requestTimeout = ConnectionOptions.DEFAULT_REQUEST_TIMEOUT;
-    private long openTimeout = ConnectionOptions.DEFAULT_OPEN_TIMEOUT;
-    private long closeTimeout = ConnectionOptions.DEFAULT_CLOSE_TIMEOUT;
-
     private boolean autoAccept = true;
-    private boolean autoSettle = true;
-    private DeliveryMode deliveryMode = DeliveryMode.AT_LEAST_ONCE;
     private int creditWindow = 10;
-    private String linkName;
-
-    private final SourceOptions source = new SourceOptions();
-    private final TargetOptions target = new TargetOptions();
-
-    private String[] offeredCapabilities;
-    private String[] desiredCapabilities;
-    private Map<String, Object> properties;
 
     /**
      * Create a new ReceiverOptions instance with defaults set for all options.
@@ -85,67 +66,6 @@ public class ReceiverOptions {
         return autoAccept;
     }
 
-    /**
-     * Controls if the created Receiver will automatically settle the deliveries that have
-     * been received by the application (default is <code>true</code>).
-     *
-     * @param autoSettle
-     *      The value to assign for auto delivery settlement.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions autoSettle(boolean autoSettle) {
-        this.autoSettle = autoSettle;
-        return this;
-    }
-
-    /**
-     * @return the current value of the {@link Receiver} auto settlement setting.
-     */
-    public boolean autoSettle() {
-        return autoSettle;
-    }
-
-    /**
-     * Sets the {@link DeliveryMode} value to assign to newly created {@link Receiver} instances.
-     *
-     * @param deliveryMode
-     *      The delivery mode value to configure.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions deliveryMode(DeliveryMode deliveryMode) {
-        this.deliveryMode = deliveryMode;
-        return this;
-    }
-
-    /**
-     * @return the current value of the {@link Receiver} delivery mode configuration.
-     */
-    public DeliveryMode deliveryMode() {
-        return deliveryMode;
-    }
-
-    /**
-     * Configures the link name to use when creating a given {@link Receiver} instance.
-     *
-     * @param linkName
-     *      The assigned link name to use when creating a {@link Receiver}.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions linkName(String linkName) {
-        this.linkName = linkName;
-        return this;
-    }
-
-    /**
-     * @return the configured link name to use when creating a {@link Receiver}.
-     */
-    public String linkName() {
-        return linkName;
-    }
-
     /**
      * @return the credit window configuration that will be applied to created {@link Receiver} instances.
      */
@@ -172,78 +92,6 @@ public class ReceiverOptions {
         return this;
     }
 
-    /**
-     * @return the timeout used when awaiting a response from the remote when a {@link Receiver} is closed.
-     */
-    public long closeTimeout() {
-        return closeTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to close
-     * the {@link Receiver} link.
-     *
-     * @param closeTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions closeTimeout(long closeTimeout) {
-        return closeTimeout(closeTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to close
-     * the {@link Receiver} link.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions closeTimeout(long timeout, TimeUnit units) {
-        this.closeTimeout = units.toMillis(timeout);
-        return this;
-    }
-
-    /**
-     * @return the timeout used when awaiting a response from the remote when a {@link Receiver} is opened.
-     */
-    public long openTimeout() {
-        return openTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to open
-     * a {@link Receiver} has been honored.
-     *
-     * @param openTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions openTimeout(long openTimeout) {
-        return openTimeout(openTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to open
-     * a {@link Receiver} has been honored.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions openTimeout(long timeout, TimeUnit units) {
-        this.openTimeout = units.toMillis(timeout);
-        return this;
-    }
-
     /**
      * @return the configured drain timeout value that will use to fail a pending drain request.
      */
@@ -280,113 +128,6 @@ public class ReceiverOptions {
         return this;
     }
 
-    /**
-     * @return the timeout used when awaiting a response from the remote when a resource makes a request.
-     */
-    public long requestTimeout() {
-        return requestTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to
-     * perform some action such as starting a new transaction.  If the remote does not respond
-     * within the configured timeout the resource making the request will mark it as failed and
-     * return an error to the request initiator usually in the form of a
-     * {@link ClientOperationTimedOutException}.
-     *
-     * @param requestTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions requestTimeout(long requestTimeout) {
-        return requestTimeout(requestTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to
-     * perform some action such as starting a new transaction.  If the remote does not respond
-     * within the configured timeout the resource making the request will mark it as failed and
-     * return an error to the request initiator usually in the form of a
-     * {@link ClientOperationTimedOutException}.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions requestTimeout(long timeout, TimeUnit units) {
-        this.requestTimeout = units.toMillis(timeout);
-        return this;
-    }
-
-    /**
-     * @return the offeredCapabilities
-     */
-    public String[] offeredCapabilities() {
-        return offeredCapabilities;
-    }
-
-    /**
-     * @param offeredCapabilities the offeredCapabilities to set
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions offeredCapabilities(String... offeredCapabilities) {
-        this.offeredCapabilities = offeredCapabilities;
-        return this;
-    }
-
-    /**
-     * @return the desiredCapabilities
-     */
-    public String[] desiredCapabilities() {
-        return desiredCapabilities;
-    }
-
-    /**
-     * @param desiredCapabilities the desiredCapabilities to set
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions desiredCapabilities(String... desiredCapabilities) {
-        this.desiredCapabilities = desiredCapabilities;
-        return this;
-    }
-
-    /**
-     * @return the properties
-     */
-    public Map<String, Object> properties() {
-        return properties;
-    }
-
-    /**
-     * @param properties the properties to set
-     *
-     * @return this {@link ReceiverOptions} instance.
-     */
-    public ReceiverOptions properties(Map<String, Object> properties) {
-        this.properties = properties;
-        return this;
-    }
-
-    /**
-     * @return the source options that will be used when creating new {@link Receiver} instances.
-     */
-    public SourceOptions sourceOptions() {
-        return source;
-    }
-
-    /**
-     * @return the target options that will be used when creating new {@link Sender} instances.
-     */
-    public TargetOptions targetOptions() {
-        return target;
-    }
-
     @Override
     public ReceiverOptions clone() {
         return copyInto(new ReceiverOptions());
@@ -402,26 +143,17 @@ public class ReceiverOptions {
      * @return this options class for chaining.
      */
     protected ReceiverOptions copyInto(ReceiverOptions other) {
+        super.copyInto(other);
+
+        other.autoAccept(autoAccept);
         other.creditWindow(creditWindow);
-        other.linkName(linkName);
-        other.closeTimeout(closeTimeout);
-        other.openTimeout(openTimeout);
         other.drainTimeout(drainTimeout);
-        other.requestTimeout(requestTimeout);
 
-        if (offeredCapabilities != null) {
-            other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
-        }
-        if (desiredCapabilities != null) {
-            other.desiredCapabilities(Arrays.copyOf(desiredCapabilities, desiredCapabilities.length));
-        }
-        if (properties != null) {
-            other.properties(new HashMap<>(properties));
-        }
-
-        source.copyInto(other.sourceOptions());
-        target.copyInto(other.targetOptions());
+        return this;
+    }
 
+    @Override
+    protected ReceiverOptions self() {
         return this;
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java
index 034743d5..3e2c083c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/Sender.java
@@ -17,185 +17,13 @@
 package org.apache.qpid.protonj2.client;
 
 import java.util.Map;
-import java.util.concurrent.Future;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 
 /**
  * AMQP Sender that provides an API for sending complete Message payload data.
  */
-public interface Sender extends AutoCloseable {
-
-    /**
-     * @return a {@link Future} that will be completed when the remote opens this {@link Sender}.
-     */
-    Future<Sender> openFuture();
-
-    /**
-     * Requests a close of the {@link Sender} at the remote and waits until the Sender has been
-     * fully closed or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
-     */
-    @Override
-    void close();
-
-    /**
-     * Requests a close of the {@link Sender} at the remote and waits until the Sender has been
-     * fully closed or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
-     *
-     * @param error
-     *      The {@link ErrorCondition} to transmit to the remote along with the close operation.
-     */
-    void close(ErrorCondition error);
-
-    /**
-     * Requests a detach of the {@link Sender} at the remote and waits until the Sender has been
-     * fully detached or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
-     */
-    void detach();
-
-    /**
-     * Requests a detach of the {@link Sender} at the remote and waits until the Sender has been
-     * fully detached or until the configured {@link SenderOptions#closeTimeout()} is exceeded.
-     *
-     * @param error
-     *      The {@link ErrorCondition} to transmit to the remote along with the detach operation.
-     */
-    void detach(ErrorCondition error);
-
-    /**
-     * Requests a close of the {@link Sender} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been closed.
-     *
-     * @return a {@link Future} that will be completed when the remote closes this {@link Sender} link.
-     */
-    Future<Sender> closeAsync();
-
-    /**
-     * Requests a close of the {@link Sender} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been closed.
-     *
-     * @param error
-     * 		The {@link ErrorCondition} to transmit to the remote along with the close operation.
-     *
-     * @return a {@link Future} that will be completed when the remote closes this {@link Sender} link.
-     */
-    Future<Sender> closeAsync(ErrorCondition error);
-
-    /**
-     * Requests a detach of the {@link Sender} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been detached.
-     *
-     * @return a {@link Future} that will be completed when the remote detaches this {@link Sender} link.
-     */
-    Future<Sender> detachAsync();
-
-    /**
-     * Requests a detach of the {@link Sender} link at the remote and returns a {@link Future} that will be
-     * completed once the link has been detached.
-     *
-     * @param error
-     * 		The {@link ErrorCondition} to transmit to the remote along with the detach operation.
-     *
-     * @return a {@link Future} that will be completed when the remote detaches this {@link Sender} link.
-     */
-    Future<Sender> detachAsync(ErrorCondition error);
-
-    /**
-     * Returns the address that the {@link Sender} instance will send {@link Message} objects
-     * to.  The value returned from this method is control by the configuration that was used
-     * to create the sender.
-     *
-     * <ul>
-     *  <li>
-     *    If the Sender is configured as an anonymous sender then this method returns null.
-     *  </li>
-     *  <li>
-     *    If the Sender was created with the dynamic sender methods then the method will return
-     *    the dynamically created address once the remote has attached its end of the sender link.
-     *    Due to the need to await the remote peer to populate the dynamic address this method will
-     *    block until the open of the sender link has completed.
-     *  </li>
-     *  <li>
-     *    If neither of the above is true then the address returned is the address passed to the original
-     *    {@link Session#openSender(String)} or {@link Session#openSender(String, SenderOptions)} methods.
-     *  </li>
-     * </ul>
-     *
-     * @return the address that this {@link Sender} is sending to.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} address.
-     */
-    String address() throws ClientException;
-
-    /**
-     * Returns an immutable view of the remote {@link Source} object assigned to this sender link.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
-     * {@link Source}.
-     *
-     * @return the remote {@link Source} node configuration.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote {@link Source}.
-     */
-    Source source() throws ClientException;
-
-    /**
-     * Returns an immutable view of the remote {@link Target} object assigned to this sender link.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
-     * {@link Target}.
-     *
-     * @return the remote {@link Target} node configuration.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote {@link Target}.
-     */
-    Target target() throws ClientException;
-
-    /**
-     * Returns the properties that the remote provided upon successfully opening the {@link Sender}.  If the
-     * attach has not completed yet this method will block to await the attach response which carries the remote
-     * properties.  If the remote provides no properties this method will return null.
-     *
-     * @return any properties provided from the remote once the sender has successfully opened.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote properties.
-     */
-    Map<String, Object> properties() throws ClientException;
-
-    /**
-     * Returns the offered capabilities that the remote provided upon successfully opening the {@link Sender}.
-     * If the attach has not completed yet this method will block to await the attach response which carries the
-     * remote offered capabilities.  If the remote provides no capabilities this method will return null.
-     *
-     * @return any capabilities provided from the remote once the sender has successfully opened.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote offered capabilities.
-     */
-    String[] offeredCapabilities() throws ClientException;
-
-    /**
-     * Returns the desired capabilities that the remote provided upon successfully opening the {@link Sender}.
-     * If the attach has not completed yet this method will block to await the attach response which carries the
-     * remote desired capabilities.  If the remote provides no capabilities this method will return null.
-     *
-     * @return any desired capabilities provided from the remote once the sender has successfully opened.
-     *
-     * @throws ClientException if an error occurs while obtaining the {@link Sender} remote desired capabilities.
-     */
-    String[] desiredCapabilities() throws ClientException;
-
-    /**
-     * @return the {@link Client} instance that holds this session's {@link Sender}
-     */
-    Client client();
-
-    /**
-     * @return the {@link Connection} instance that holds this session's {@link Sender}
-     */
-    Connection connection();
-
-    /**
-     * @return the {@link Session} that created and holds this {@link Sender}.
-     */
-    Session session();
+public interface Sender extends Link<Sender> {
 
     /**
      * Send the given message immediately if there is credit available or blocks if the link
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
index 7097d43b..8e3cba23 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/SenderOptions.java
@@ -16,34 +16,16 @@
  */
 package org.apache.qpid.protonj2.client;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
 import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 
 /**
  * Options that control the behavior of a {@link Sender} created from them.
  */
-public class SenderOptions {
+public class SenderOptions extends LinkOptions<SenderOptions>{
 
     private long sendTimeout = ConnectionOptions.DEFAULT_SEND_TIMEOUT;
-    private long requestTimeout = ConnectionOptions.DEFAULT_REQUEST_TIMEOUT;
-    private long openTimeout = ConnectionOptions.DEFAULT_OPEN_TIMEOUT;
-    private long closeTimeout = ConnectionOptions.DEFAULT_CLOSE_TIMEOUT;
-
-    private String linkName;
-    private boolean autoSettle = true;
-    private DeliveryMode deliveryMode = DeliveryMode.AT_LEAST_ONCE;
-
-    private final SourceOptions source = new SourceOptions();
-    private final TargetOptions target = new TargetOptions();
-
-    private String[] offeredCapabilities;
-    private String[] desiredCapabilities;
-    private Map<String, Object> properties;
 
     /**
      * Create a new {@link SenderOptions} instance configured with default configuration settings.
@@ -63,147 +45,6 @@ public class SenderOptions {
         }
     }
 
-    /**
-     * Configures the link name to use when creating a given {@link Sender} instance.
-     *
-     * @param linkName
-     *      The assigned link name to use when creating a {@link Sender}.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions linkName(String linkName) {
-        this.linkName = linkName;
-        return this;
-    }
-
-    /**
-     * @return the configured link name to use when creating a {@link Sender}.
-     */
-    public String linkName() {
-        return linkName;
-    }
-
-    /**
-     * Sets whether sent deliveries should be automatically locally-settled once
-     * they have become remotely-settled by the receiving peer.
-     *
-     * True by default.
-     *
-     * @param autoSettle
-     *            whether deliveries should be auto settled locally after being
-     *            settled by the receiver
-     *
-     * @return the sender
-     */
-    public SenderOptions autoSettle(boolean autoSettle) {
-        this.autoSettle = autoSettle;
-        return this;
-    }
-
-    /**
-     * Get whether the {@link Sender} is auto settling deliveries.
-     *
-     * @return whether deliveries should be auto settled locally after being settled
-     *         by the receiver
-     *
-     * @see #autoSettle(boolean)
-     */
-    public boolean autoSettle() {
-        return autoSettle;
-    }
-
-    /**
-     * Sets the {@link DeliveryMode} value to assign to newly created {@link Sender} instances.
-     *
-     * @param deliveryMode
-     *      The delivery mode value to configure.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions deliveryMode(DeliveryMode deliveryMode) {
-        this.deliveryMode = deliveryMode;
-        return this;
-    }
-
-    /**
-     * @return the current value of the {@link Sender} delivery mode configuration.
-     */
-    public DeliveryMode deliveryMode() {
-        return deliveryMode;
-    }
-
-    /**
-     * @return the timeout used when awaiting a response from the remote when a {@link Sender} is closed.
-     */
-    public long closeTimeout() {
-        return closeTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to close
-     * the {@link Sender} link.
-     *
-     * @param closeTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions closeTimeout(long closeTimeout) {
-        return closeTimeout(closeTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to close
-     * the {@link Sender} link.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions closeTimeout(long timeout, TimeUnit units) {
-        this.closeTimeout = units.toMillis(timeout);
-        return this;
-    }
-
-    /**
-     * @return the timeout used when awaiting a response from the remote when a {@link Sender} is opened.
-     */
-    public long openTimeout() {
-        return openTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to open
-     * a {@link Sender} has been honored.
-     *
-     * @param openTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions openTimeout(long openTimeout) {
-        return openTimeout(openTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to open
-     * a {@link Sender} has been honored.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions openTimeout(long timeout, TimeUnit units) {
-        this.openTimeout = units.toMillis(timeout);
-        return this;
-    }
-
     /**
      * @return the timeout used when awaiting a response from the remote when a resource is message send.
      */
@@ -244,113 +85,6 @@ public class SenderOptions {
         return this;
     }
 
-    /**
-     * @return the timeout used when awaiting a response from the remote when a resource makes a request.
-     */
-    public long requestTimeout() {
-        return requestTimeout;
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to
-     * perform some action such as starting a new transaction.  If the remote does not respond
-     * within the configured timeout the resource making the request will mark it as failed and
-     * return an error to the request initiator usually in the form of a
-     * {@link ClientOperationTimedOutException}.
-     *
-     * @param requestTimeout
-     *      Timeout value in milliseconds to wait for a remote response.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions requestTimeout(long requestTimeout) {
-        return requestTimeout(requestTimeout, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Configures the timeout used when awaiting a response from the remote that a request to
-     * perform some action such as starting a new transaction.  If the remote does not respond
-     * within the configured timeout the resource making the request will mark it as failed and
-     * return an error to the request initiator usually in the form of a
-     * {@link ClientOperationTimedOutException}.
-     *
-     * @param timeout
-     *      Timeout value to wait for a remote response.
-     * @param units
-     * 		The {@link TimeUnit} that defines the timeout span.
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions requestTimeout(long timeout, TimeUnit units) {
-        this.requestTimeout = units.toMillis(timeout);
-        return this;
-    }
-
-    /**
-     * @return the offeredCapabilities
-     */
-    public String[] offeredCapabilities() {
-        return offeredCapabilities;
-    }
-
-    /**
-     * @param offeredCapabilities the offeredCapabilities to set
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions offeredCapabilities(String... offeredCapabilities) {
-        this.offeredCapabilities = offeredCapabilities;
-        return this;
-    }
-
-    /**
-     * @return the desiredCapabilities
-     */
-    public String[] desiredCapabilities() {
-        return desiredCapabilities;
-    }
-
-    /**
-     * @param desiredCapabilities the desiredCapabilities to set
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions desiredCapabilities(String... desiredCapabilities) {
-        this.desiredCapabilities = desiredCapabilities;
-        return this;
-    }
-
-    /**
-     * @return the properties
-     */
-    public Map<String, Object> properties() {
-        return properties;
-    }
-
-    /**
-     * @param properties the properties to set
-     *
-     * @return this {@link SenderOptions} instance.
-     */
-    public SenderOptions properties(Map<String, Object> properties) {
-        this.properties = properties;
-        return this;
-    }
-
-    /**
-     * @return the source
-     */
-    public SourceOptions sourceOptions() {
-        return source;
-    }
-
-    /**
-     * @return the target
-     */
-    public TargetOptions targetOptions() {
-        return target;
-    }
-
     @Override
     public SenderOptions clone() {
         return copyInto(new SenderOptions());
@@ -366,26 +100,15 @@ public class SenderOptions {
      * @return this options class for chaining.
      */
     protected SenderOptions copyInto(SenderOptions other) {
-        other.autoSettle(autoSettle);
-        other.linkName(linkName);
-        other.closeTimeout(closeTimeout);
-        other.openTimeout(openTimeout);
-        other.sendTimeout(sendTimeout);
-        other.requestTimeout(requestTimeout);
+        super.copyInto(other);
 
-        if (offeredCapabilities != null) {
-            other.offeredCapabilities(Arrays.copyOf(offeredCapabilities, offeredCapabilities.length));
-        }
-        if (desiredCapabilities != null) {
-            other.desiredCapabilities(Arrays.copyOf(desiredCapabilities, desiredCapabilities.length));
-        }
-        if (properties != null) {
-            other.properties(new HashMap<>(properties));
-        }
+        other.sendTimeout(sendTimeout);
 
-        source.copyInto(other.sourceOptions());
-        target.copyInto(other.targetOptions());
+        return this;
+    }
 
+    @Override
+    protected SenderOptions self() {
         return this;
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamDelivery.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamDelivery.java
index 4d1b2969..7a971732 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamDelivery.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamDelivery.java
@@ -16,31 +16,60 @@
  */
 package org.apache.qpid.protonj2.client;
 
+import java.io.InputStream;
+import java.util.Map;
+
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
+import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
 import org.apache.qpid.protonj2.types.transport.Transfer;
 
 /**
- * A specialized {@link Delivery} type that is returned from the {@link StreamReceiver}
+ * A specialized {@link StreamDelivery} type that is returned from the {@link StreamReceiver}
  * which can be used to read incoming large messages that are streamed via multiple incoming
  * AMQP {@link Transfer} frames.
  */
-public interface StreamDelivery extends Delivery {
+public interface StreamDelivery {
 
     /**
      * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
      */
-    @Override
     StreamReceiver receiver();
 
     /**
-     * {@inheritDoc}
+     * Decode the {@link StreamDelivery} payload and return an {@link Message} object.
+     * <p>
+     * If the incoming message carried any delivery annotations they can be accessed via the
+     * {@link #annotations()} method.  Re-sending the returned message will not also
+     * send the incoming delivery annotations, the sender must include them in the
+     * {@link Sender#send(Message, Map)} call if they are to be forwarded onto the next recipient.
+     * <p>
+     * Calling this message claims the payload of the delivery for the returned {@link Message} and
+     * excludes use of the {@link #rawInputStream()} method of the {@link StreamDelivery} object.  Calling
+     * the {@link #rawInputStream()} method after calling this method throws {@link ClientIllegalStateException}.
+     *
+     * @return a {@link Message} instance that wraps the decoded payload.
      *
-     * @return a {@link StreamReceiverMessage} instance that can be used to read the incoming message stream.
+     * @throws ClientException if an error occurs while decoding the payload.
+     *
+     * @param <E> The type of message body that should be contained in the returned {@link Message}.
      */
-    @SuppressWarnings("unchecked")
-    @Override
     StreamReceiverMessage message() throws ClientException;
 
+    /**
+     * Create and return an {@link InputStream} that reads the raw payload bytes of the given {@link StreamDelivery}.
+     * <p>
+     * Calling this method claims the payload of the delivery for the returned {@link InputStream} and excludes
+     * use of the {@link #message()} and {@link #annotations()} methods of the {@link StreamDelivery} object.  Closing
+     * the returned input stream discards any unread bytes from the delivery payload.  Calling the {@link #message()}
+     * or {@link #annotations()} methods after calling this method throws {@link ClientIllegalStateException}.
+     *
+     * @return an {@link InputStream} instance that can be used to read the raw delivery payload.
+     *
+     * @throws ClientException if an error occurs while decoding the payload.
+     */
+    InputStream rawInputStream() throws ClientException;
+
     /**
      * Check if the {@link StreamDelivery} has been marked as aborted by the remote sender.
      *
@@ -56,51 +85,132 @@ public interface StreamDelivery extends Delivery {
     boolean completed();
 
     /**
-     * {@inheritDoc}
+     * Decodes the {@link StreamDelivery} payload and returns a {@link Map} containing a copy
+     * of any associated {@link DeliveryAnnotations} that were transmitted with the {@link Message}
+     * payload of this {@link StreamDelivery}.
+     * <p>
+     * Calling this message claims the payload of the delivery for the returned {@link Map} and the decoded
+     * {@link Message} that can be accessed via the {@link #message()} method and  excludes use of the
+     * {@link #rawInputStream()} method of the {@link StreamDelivery} object.  Calling the {@link #rawInputStream()}
+     * method after calling this method throws {@link ClientIllegalStateException}.
      *
-     * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
+     * @return copy of the delivery annotations that were transmitted with the {@link Message} payload.
+     *
+     * @throws ClientException if an error occurs while decoding the payload.
+     */
+    Map<String, Object> annotations() throws ClientException;
+
+    /**
+     * Accepts and settles the delivery.
+     *
+     * @return this {@link StreamDelivery} instance.
+     *
+     * @throws ClientException if an error occurs while sending the disposition
      */
-    @Override
     StreamDelivery accept() throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Releases and settles the delivery.
      *
-     * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
+     * @return this {@link StreamDelivery} instance.
+     *
+     * @throws ClientException if an error occurs while sending the disposition
      */
-    @Override
     StreamDelivery release() throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Rejects and settles the delivery, sending supplied error information along
+     * with the rejection.
      *
-     * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
+     * @param condition
+     *      The error condition value to supply with the rejection.
+     * @param description
+     *      The error description value to supply with the rejection.
+     *
+     * @return this {@link StreamDelivery} instance.
+     *
+     * @throws ClientException if an error occurs while sending the disposition
      */
-    @Override
     StreamDelivery reject(String condition, String description) throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Modifies and settles the delivery.
      *
-     * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
+     * @param deliveryFailed
+     *      Indicates if the modified delivery failed.
+     * @param undeliverableHere
+     *      Indicates if the modified delivery should not be returned here again.
+     *
+     * @return this {@link StreamDelivery} instance.
+     *
+     * @throws ClientException if an error occurs while sending the disposition
      */
-    @Override
     StreamDelivery modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Updates the DeliveryState, and optionally settle the delivery as well.
      *
-     * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
+     * @param state
+     *            the delivery state to apply
+     * @param settle
+     *            whether to {@link #settle()} the delivery at the same time
+     *
+     * @return this {@link StreamDelivery} instance.
+     *
+     * @throws ClientException if an error occurs while sending the disposition
      */
-    @Override
     StreamDelivery disposition(DeliveryState state, boolean settle) throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Settles the delivery locally.
      *
-     * @return the {@link StreamReceiver} that originated this {@link StreamDelivery}.
+     * @return the delivery
+     *
+     * @throws ClientException if an error occurs while sending the disposition
      */
-    @Override
     StreamDelivery settle() throws ClientException;
 
+    /**
+     * @return true if the delivery has been locally settled.
+     *
+     * @throws ClientException if an error occurs while reading the settled state
+     */
+    boolean settled() throws ClientException;
+
+    /**
+     * Gets the current local state for the delivery.
+     *
+     * @return the delivery state
+     *
+     * @throws ClientException if an error occurs while reading the delivery state
+     */
+    DeliveryState state() throws ClientException;
+
+    /**
+     * Gets the current remote state for the delivery.
+     *
+     * @return the remote delivery state
+     *
+     * @throws ClientException if an error occurs while reading the remote delivery state
+     */
+    DeliveryState remoteState() throws ClientException;
+
+    /**
+     * Gets whether the delivery was settled by the remote peer yet.
+     *
+     * @return whether the delivery is remotely settled
+     *
+     * @throws ClientException if an error occurs while reading the remote settlement state
+     */
+    boolean remoteSettled() throws ClientException;
+
+    /**
+     * Gets the message format for the current delivery.
+     *
+     * @return the message format
+     *
+     * @throws ClientException if an error occurs while reading the delivery message format
+     */
+    int messageFormat() throws ClientException;
+
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
index 86e1b73e..3de8aa76 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiver.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.protonj2.client;
 
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
@@ -25,7 +26,34 @@ import org.apache.qpid.protonj2.types.transport.Transfer;
  * A receiver of large message content that is delivered in multiple {@link Transfer} frames from
  * the remote.
  */
-public interface StreamReceiver extends Receiver {
+public interface StreamReceiver extends Link<StreamReceiver> {
+
+    /**
+     * Adds credit to the {@link StreamReceiver} link for use when the receiver has not been configured
+     * with a credit window.  When credit window is configured credit replenishment is automatic and
+     * calling this method will result in an exception indicating that the operation is invalid.
+     * <p>
+     * If the {@link Receiver} is draining and this method is called an exception will be thrown
+     * to indicate that credit cannot be replenished until the remote has drained the existing link
+     * credit.
+     *
+     * @param credits
+     *      The number of credits to add to the {@link Receiver} link.
+     *
+     * @return this {@link StreamReceiver} instance.
+     *
+     * @throws ClientException if an error occurs while attempting to add new {@link StreamReceiver} link credit.
+     */
+    StreamReceiver addCredit(int credits) throws ClientException;
+
+    /**
+     * Requests the remote to drain previously granted credit for this {@link StreamReceiver} link.
+     *
+     * @return a {@link Future} that will be completed when the remote drains this {@link StreamReceiver} link.
+     *
+     * @throws ClientException if an error occurs while attempting to drain the link credit.
+     */
+    Future<StreamReceiver> drain() throws ClientException;
 
     /**
      * Blocking receive method that waits forever for the remote to provide a {@link StreamReceiverMessage} for consumption.
@@ -39,7 +67,6 @@ public interface StreamReceiver extends Receiver {
      *
      * @throws ClientException if the {@link StreamReceiver} or its parent is closed when the call to receive is made.
      */
-    @Override
     StreamDelivery receive() throws ClientException;
 
     /**
@@ -64,7 +91,6 @@ public interface StreamReceiver extends Receiver {
      *
      * @throws ClientException if the {@link StreamReceiver} or its parent is closed when the call to receive is made.
      */
-    @Override
     StreamDelivery receive(long timeout, TimeUnit unit) throws ClientException;
 
     /**
@@ -75,20 +101,17 @@ public interface StreamReceiver extends Receiver {
      *
      * @throws ClientException if the {@link StreamReceiver} or its parent is closed when the call to try to receive is made.
      */
-    @Override
     StreamDelivery tryReceive() throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Returns the number of Deliveries that are currently held in the {@link Receiver} delivery
+     * queue.  This number is likely to change immediately following the call as more deliveries
+     * arrive but can be used to determine if any pending {@link Delivery} work is ready.
      *
-     * @param credits
-     *      credit The number of credits to add to the {@link StreamReceiver} link.
-     *
-     * @return this {@link StreamReceiver} instance.
+     * @return the number of deliveries that are currently buffered locally.
      *
-     * @throws ClientException if an error occurs while attempting to add new {@link StreamReceiver} link credit.
+     * @throws ClientException if an error occurs while attempting to fetch the queue count.
      */
-    @Override
-    StreamReceiver addCredit(int credits) throws ClientException;
+    long queuedDeliveries() throws ClientException;
 
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiverOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiverOptions.java
index 6f3eff8f..3d52fecd 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiverOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamReceiverOptions.java
@@ -16,14 +16,13 @@
  */
 package org.apache.qpid.protonj2.client;
 
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Options class that controls various aspects of a {@link StreamReceiver} instance and how
  * a streamed message transfer is written.
  */
-public class StreamReceiverOptions extends ReceiverOptions {
+public class StreamReceiverOptions extends LinkOptions<StreamReceiverOptions> {
 
     /**
      * Defines the default read buffering size which is used to control how much incoming
@@ -33,6 +32,9 @@ public class StreamReceiverOptions extends ReceiverOptions {
     public static final int DEFAULT_READ_BUFFER_SIZE = SessionOptions.DEFAULT_SESSION_INCOMING_CAPACITY;
 
     private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
+    private long drainTimeout = ConnectionOptions.DEFAULT_DRAIN_TIMEOUT;
+    private boolean autoAccept = true;
+    private int creditWindow = 10;
 
     /**
      * Creates a {@link StreamReceiverOptions} instance with default values for all options
@@ -71,6 +73,9 @@ public class StreamReceiverOptions extends ReceiverOptions {
         super.copyInto(other);
 
         other.readBufferSize(readBufferSize);
+        other.autoAccept(autoAccept);
+        other.creditWindow(creditWindow);
+        other.drainTimeout(drainTimeout);
 
         return this;
     }
@@ -100,90 +105,91 @@ public class StreamReceiverOptions extends ReceiverOptions {
         return this;
     }
 
-    //----- Override super methods to customize the return type
-
-    @Override
+    /**
+     * Controls if the created Receiver will automatically accept the deliveries that have
+     * been received by the application (default is <code>true</code>).
+     *
+     * @param autoAccept
+     *      The value to assign for auto delivery acceptance.
+     *
+     * @return this {@link StreamReceiverOptions} instance.
+     */
     public StreamReceiverOptions autoAccept(boolean autoAccept) {
-        return (StreamReceiverOptions) super.autoAccept(autoAccept);
-    }
-
-    @Override
-    public StreamReceiverOptions autoSettle(boolean autoSettle) {
-        return (StreamReceiverOptions) super.autoSettle(autoSettle);
+        this.autoAccept = autoAccept;
+        return this;
     }
 
-    @Override
-    public StreamReceiverOptions deliveryMode(DeliveryMode deliveryMode) {
-        return (StreamReceiverOptions) super.deliveryMode(deliveryMode);
+    /**
+     * @return the current value of the {@link Receiver} auto accept setting.
+     */
+    public boolean autoAccept() {
+        return autoAccept;
     }
 
-    @Override
-    public StreamReceiverOptions linkName(String linkName) {
-        return (StreamReceiverOptions) super.linkName(linkName);
+    /**
+     * @return the credit window configuration that will be applied to created {@link Receiver} instances.
+     */
+    public int creditWindow() {
+        return creditWindow;
     }
 
-    @Override
+    /**
+     * A credit window value that will be used to maintain an window of credit for Receiver instances
+     * that are created.  The {@link Receiver} will allow up to the credit window amount of incoming
+     * deliveries to be queued and as they are read from the {@link Receiver} the window will be extended
+     * to maintain a consistent backlog of deliveries.  The default is to configure a credit window of 10.
+     * <p>
+     * To disable credit windowing and allow the client application to control the credit on the {@link Receiver}
+     * link the credit window value should be set to zero.
+     *
+     * @param creditWindow
+     *      The assigned credit window value to use.
+     *
+     * @return this {@link StreamReceiverOptions} instance.
+     */
     public StreamReceiverOptions creditWindow(int creditWindow) {
-        return (StreamReceiverOptions) super.creditWindow(creditWindow);
-    }
-
-    @Override
-    public StreamReceiverOptions closeTimeout(long closeTimeout) {
-        return (StreamReceiverOptions) super.closeTimeout(closeTimeout);
-    }
-
-    @Override
-    public StreamReceiverOptions closeTimeout(long timeout, TimeUnit units) {
-        return (StreamReceiverOptions) super.closeTimeout(timeout, units);
-    }
-
-    @Override
-    public StreamReceiverOptions openTimeout(long openTimeout) {
-        return (StreamReceiverOptions) super.openTimeout(openTimeout);
+        this.creditWindow = creditWindow;
+        return this;
     }
 
-    @Override
-    public StreamReceiverOptions openTimeout(long timeout, TimeUnit units) {
-        return (StreamReceiverOptions) super.openTimeout(timeout, units);
+    /**
+     * @return the configured drain timeout value that will use to fail a pending drain request.
+     */
+    public long drainTimeout() {
+        return drainTimeout;
     }
 
-    @Override
+    /**
+     * Sets the drain timeout (in milliseconds) after which a {@link Receiver} request to drain
+     * link credit is considered failed and the request will be marked as such.
+     *
+     * @param drainTimeout
+     *      the drainTimeout to use for receiver links.
+     *
+     * @return this {@link StreamReceiverOptions} instance.
+     */
     public StreamReceiverOptions drainTimeout(long drainTimeout) {
-        return (StreamReceiverOptions) super.drainTimeout(drainTimeout);
+        return drainTimeout(drainTimeout, TimeUnit.MILLISECONDS);
     }
 
-    @Override
+    /**
+     * Sets the drain timeout value after which a {@link Receiver} request to drain
+     * link credit is considered failed and the request will be marked as such.
+     *
+     * @param timeout
+     *      Timeout value to wait for a remote response.
+     * @param units
+     * 		The {@link TimeUnit} that defines the timeout span.
+     *
+     * @return this {@link StreamReceiverOptions} instance.
+     */
     public StreamReceiverOptions drainTimeout(long timeout, TimeUnit units) {
-        return (StreamReceiverOptions) super.drainTimeout(timeout, units);
-    }
-
-    @Override
-    public StreamReceiverOptions requestTimeout(long requestTimeout) {
-        return (StreamReceiverOptions) super.requestTimeout(requestTimeout);
-    }
-
-    @Override
-    public StreamReceiverOptions requestTimeout(long timeout, TimeUnit units) {
-        return (StreamReceiverOptions) super.requestTimeout(timeout, units);
-    }
-
-    @Override
-    public StreamReceiverOptions offeredCapabilities(String... offeredCapabilities) {
-        return (StreamReceiverOptions) super.offeredCapabilities(offeredCapabilities);
-    }
-
-    @Override
-    public StreamReceiverOptions desiredCapabilities(String... desiredCapabilities) {
-        return (StreamReceiverOptions) super.desiredCapabilities(desiredCapabilities);
-    }
-
-    @Override
-    public StreamReceiverOptions properties(Map<String, Object> properties) {
-        return (StreamReceiverOptions) super.properties(properties);
+        this.drainTimeout = units.toMillis(timeout);
+        return this;
     }
 
     @Override
-    protected StreamReceiverOptions copyInto(ReceiverOptions other) {
-        return (StreamReceiverOptions) super.copyInto(other);
+    protected StreamReceiverOptions self() {
+        return this;
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSender.java
index 0f7cb577..9f34ac04 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSender.java
@@ -24,7 +24,63 @@ import org.apache.qpid.protonj2.client.exceptions.ClientException;
  * Sending link implementation that allows sending of large message payload data in
  * multiple transfers to reduce memory overhead of large message sends.
  */
-public interface StreamSender extends Sender {
+public interface StreamSender extends Link<StreamSender> {
+
+    /**
+     * Send the given message immediately if there is credit available or blocks if the link
+     * has not yet been granted credit or there is a streaming send ongoing.
+     *
+     * @param message
+     *      the {@link Message} to send.
+     *
+     * @return the {@link Tracker} for the message delivery
+     *
+     * @throws ClientException if an error occurs while initiating the send operation.
+     */
+    StreamTracker send(Message<?> message) throws ClientException;
+
+    /**
+     * Send the given message immediately if there is credit available or blocks if the link
+     * has not yet been granted credit or there is a streaming send ongoing..
+     *
+     * @param message
+     *      the {@link Message} to send.
+     * @param deliveryAnnotations
+     *      the delivery annotations that should be included in the sent {@link Message}.
+     *
+     * @return the {@link StreamTracker} for the message delivery
+     *
+     * @throws ClientException if an error occurs while initiating the send operation.
+     */
+    StreamTracker send(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException;
+
+    /**
+     * Send the given message if credit is available or returns null if no credit has been
+     * granted to the link at the time of the send attempt or a streaming send is ongoing.
+     *
+     * @param message
+     *      the {@link Message} to send if credit is available.
+     *
+     * @return the {@link StreamTracker} for the message delivery or null if no credit for sending.
+     *
+     * @throws ClientException if an error occurs while initiating the send operation.
+     */
+    StreamTracker trySend(Message<?> message) throws ClientException;
+
+    /**
+     * Send the given message if credit is available or returns null if no credit has been
+     * granted to the link at the time of the send attempt.
+     *
+     * @param message
+     *      the {@link Message} to send if credit is available.
+     * @param deliveryAnnotations
+     *      the delivery annotations that should be included in the sent {@link Message}.
+     *
+     * @return the {@link StreamTracker} for the message delivery or null if no credit for sending.
+     *
+     * @throws ClientException if an error occurs while initiating the send operation.
+     */
+    StreamTracker trySend(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException;
 
     /**
      * Creates and returns a new {@link StreamSenderMessage} that can be used by the caller to perform
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
index 3ad5fbb4..a519b2e3 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamSenderOptions.java
@@ -16,14 +16,15 @@
  */
 package org.apache.qpid.protonj2.client;
 
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
+
 /**
  * Options class that controls various aspects of a {@link StreamSenderMessage} instance and how
  * a streamed message transfer is written.
  */
-public class StreamSenderOptions extends SenderOptions {
+public class StreamSenderOptions extends LinkOptions<StreamSenderOptions> {
 
     /**
      * Defines the default pending write buffering size which is used to control how much outgoing
@@ -32,6 +33,8 @@ public class StreamSenderOptions extends SenderOptions {
      */
     public static final int DEFAULT_PENDING_WRITES_BUFFER_SIZE = SessionOptions.DEFAULT_SESSION_OUTGOING_CAPACITY;
 
+    private long sendTimeout = ConnectionOptions.DEFAULT_SEND_TIMEOUT;
+
     private int pendingWritesBufferSize = DEFAULT_PENDING_WRITES_BUFFER_SIZE;
 
     /**
@@ -79,6 +82,8 @@ public class StreamSenderOptions extends SenderOptions {
         super.copyInto(other);
 
         other.writeBufferSize(writeBufferSize);
+        other.sendTimeout(sendTimeout);
+        other.pendingWritesBufferSize(pendingWritesBufferSize);
 
         return this;
     }
@@ -128,75 +133,48 @@ public class StreamSenderOptions extends SenderOptions {
         return this;
     }
 
-    //----- Override super methods to return this options type for ease of use
-
-    @Override
-    public StreamSenderOptions linkName(String linkName) {
-        return (StreamSenderOptions) super.linkName(linkName);
-    }
-
-    @Override
-    public StreamSenderOptions autoSettle(boolean autoSettle) {
-        return (StreamSenderOptions) super.autoSettle(autoSettle);
-    }
-
-    @Override
-    public StreamSenderOptions deliveryMode(DeliveryMode deliveryMode) {
-        return (StreamSenderOptions) super.deliveryMode(deliveryMode);
-    }
-
-    @Override
-    public StreamSenderOptions closeTimeout(long closeTimeout) {
-        return (StreamSenderOptions) super.closeTimeout(closeTimeout);
-    }
-
-    @Override
-    public StreamSenderOptions closeTimeout(long timeout, TimeUnit units) {
-        return (StreamSenderOptions) super.closeTimeout(timeout, units);
-    }
-
-    @Override
-    public StreamSenderOptions openTimeout(long openTimeout) {
-        return (StreamSenderOptions) super.openTimeout(openTimeout);
-    }
-
-    @Override
-    public StreamSenderOptions openTimeout(long timeout, TimeUnit units) {
-        return (StreamSenderOptions) super.openTimeout(timeout, units);
+    /**
+     * @return the timeout used when awaiting a response from the remote when a resource is message send.
+     */
+    public long sendTimeout() {
+        return sendTimeout;
     }
 
-    @Override
+    /**
+     * Configures the timeout used when awaiting a send operation to complete.  A send will block if the
+     * remote has not granted the {@link Sender} or the {@link Session} credit to do so, if the send blocks
+     * for longer than this timeout the send call will fail with an {@link ClientSendTimedOutException}
+     * exception to indicate that the send did not complete.
+     *
+     * @param sendTimeout
+     *      Timeout value in milliseconds to wait for a remote response.
+     *
+     * @return this {@link StreamSenderOptions} instance.
+     */
     public StreamSenderOptions sendTimeout(long sendTimeout) {
-        return (StreamSenderOptions) super.sendTimeout(sendTimeout);
+        return sendTimeout(sendTimeout, TimeUnit.MILLISECONDS);
     }
 
-    @Override
+    /**
+     * Configures the timeout used when awaiting a send operation to complete.  A send will block if the
+     * remote has not granted the {@link Sender} or the {@link Session} credit to do so, if the send blocks
+     * for longer than this timeout the send call will fail with an {@link ClientSendTimedOutException}
+     * exception to indicate that the send did not complete.
+     *
+     * @param timeout
+     *      Timeout value to wait for a remote response.
+     * @param units
+     * 		The {@link TimeUnit} that defines the timeout span.
+     *
+     * @return this {@link StreamSenderOptions} instance.
+     */
     public StreamSenderOptions sendTimeout(long timeout, TimeUnit units) {
-        return (StreamSenderOptions) super.sendTimeout(timeout, units);
-    }
-
-    @Override
-    public StreamSenderOptions requestTimeout(long requestTimeout) {
-        return (StreamSenderOptions) super.requestTimeout(requestTimeout);
-    }
-
-    @Override
-    public StreamSenderOptions requestTimeout(long timeout, TimeUnit units) {
-        return (StreamSenderOptions) super.requestTimeout(timeout, units);
-    }
-
-    @Override
-    public StreamSenderOptions offeredCapabilities(String... offeredCapabilities) {
-        return (StreamSenderOptions) super.offeredCapabilities(offeredCapabilities);
-    }
-
-    @Override
-    public StreamSenderOptions desiredCapabilities(String... desiredCapabilities) {
-        return (StreamSenderOptions) super.desiredCapabilities(desiredCapabilities);
+        this.sendTimeout = units.toMillis(timeout);
+        return this;
     }
 
     @Override
-    public StreamSenderOptions properties(Map<String, Object> properties) {
-        return (StreamSenderOptions) super.properties(properties);
+    protected StreamSenderOptions self() {
+        return this;
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamTracker.java
index 362758fe..1e9671a9 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/StreamTracker.java
@@ -19,58 +19,133 @@ package org.apache.qpid.protonj2.client;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.types.messaging.Accepted;
+import org.apache.qpid.protonj2.types.transport.Disposition;
 
 /**
  * Special StreamSender related {@link Tracker} that is linked to any {@link StreamSenderMessage}
  * instance and provides the {@link Tracker} functions for those types of messages.
  */
-public interface StreamTracker extends Tracker {
+public interface StreamTracker {
 
     /**
-     * {@inheritDoc}
-     *
-     * @return the {@link StreamSender} that is associated with this {@link StreamTracker}.
+     * @return the {@link StreamSender} that was used to send the delivery that is being tracked.
      */
-    @Override
     StreamSender sender();
 
     /**
-     * {@inheritDoc}
+     * Settles the delivery locally, if not {@link SenderOptions#autoSettle() auto-settling}.
+     *
+     * @return this {@link Tracker} instance.
      *
-     * @return this {@link StreamTracker} instance.
+     * @throws ClientException if an error occurs while performing the settlement.
      */
-    @Override
     StreamTracker settle() throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * @return true if the sent message has been locally settled.
      */
-    @Override
-    Future<Tracker> settlementFuture();
+    boolean settled();
 
     /**
-     * {@inheritDoc}
+     * Gets the current local state for the tracked delivery.
      *
-     * @return this {@link StreamTracker} instance.
+     * @return the delivery state
+     */
+    DeliveryState state();
+
+    /**
+     * Gets the current remote state for the tracked delivery.
+     *
+     * @return the remote {@link DeliveryState} once a value is received from the remote.
+     */
+    DeliveryState remoteState();
+
+    /**
+     * Gets whether the delivery was settled by the remote peer yet.
+     *
+     * @return whether the delivery is remotely settled
+     */
+    boolean remoteSettled();
+
+    /**
+     * Updates the DeliveryState, and optionally settle the delivery as well.
+     *
+     * @param state
+     *            the delivery state to apply
+     * @param settle
+     *            whether to {@link #settle()} the delivery at the same time
+     *
+     * @return this {@link Tracker} instance.
+     *
+     * @throws ClientException if an error occurs while applying the given disposition
      */
-    @Override
     StreamTracker disposition(DeliveryState state, boolean settle) throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Returns a future that can be used to wait for the remote to acknowledge receipt of
+     * a sent message by settling it.
+     *
+     * @return a {@link Future} that can be used to wait on remote settlement.
+     */
+    Future<StreamTracker> settlementFuture();
+
+    /**
+     * Waits if necessary for the remote to settle the sent delivery unless it has
+     * either already been settled or the original delivery was sent settled in which
+     * case the remote will not send a {@link Disposition} back.
      *
-     * @return this {@link StreamTracker} instance.
+     * @return this {@link Tracker} instance.
+     *
+     * @throws ClientException if an error occurs while awaiting the remote settlement.
      */
-    @Override
     StreamTracker awaitSettlement() throws ClientException;
 
     /**
-     * {@inheritDoc}
+     * Waits if necessary for the remote to settle the sent delivery unless it has
+     * either already been settled or the original delivery was sent settled in which
+     * case the remote will not send a {@link Disposition} back.
+     *
+     * @param timeout
+     *      the maximum time to wait for the remote to settle.
+     * @param unit
+     *      the time unit of the timeout argument.
      *
-     * @return this {@link StreamTracker} instance.
+     * @return this {@link Tracker} instance.
+     *
+     * @throws ClientException if an error occurs while awaiting the remote settlement.
      */
-    @Override
     StreamTracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException;
 
+    /**
+     * Waits if necessary for the remote to settle the sent delivery with an {@link Accepted}
+     * disposition unless it has either already been settled and accepted or the original delivery
+     * was sent settled in which case the remote will not send a {@link Disposition} back.
+     *
+     * @return this {@link Tracker} instance.
+     *
+     * @throws ClientDeliveryStateException if the remote sends a disposition other than Accepted.
+     * @throws ClientException if an error occurs while awaiting the remote settlement.
+     */
+    StreamTracker awaitAccepted() throws ClientException;
+
+    /**
+     * Waits if necessary for the remote to settle the sent delivery with an {@link Accepted}
+     * disposition unless it has either already been settled and accepted or the original delivery
+     * was sent settled in which case the remote will not send a {@link Disposition} back.
+     *
+     * @param timeout
+     *      the maximum time to wait for the remote to settle.
+     * @param unit
+     *      the time unit of the timeout argument.
+     *
+     * @return this {@link Tracker} instance.
+     *
+     * @throws ClientDeliveryStateException if the remote sends a disposition other than Accepted.
+     * @throws ClientException if an error occurs while awaiting the remote settlement.
+     */
+    StreamTracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException;
+
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
new file mode 100644
index 00000000..a042c91e
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLinkType.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Consumer;
+
+import org.apache.qpid.protonj2.client.ErrorCondition;
+import org.apache.qpid.protonj2.client.Link;
+import org.apache.qpid.protonj2.client.LinkOptions;
+import org.apache.qpid.protonj2.client.Source;
+import org.apache.qpid.protonj2.client.Target;
+import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
+import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
+import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
+import org.apache.qpid.protonj2.client.futures.ClientFuture;
+import org.apache.qpid.protonj2.engine.Connection;
+import org.apache.qpid.protonj2.engine.Engine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base type used by client resources that represent an AMQP link type.
+ */
+public abstract class ClientLinkType<LinkType extends Link<LinkType>,
+                                     ProtonType extends org.apache.qpid.protonj2.engine.Link<ProtonType>> implements Link<LinkType> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientLinkType.class);
+
+    @SuppressWarnings("rawtypes")
+    protected static final AtomicIntegerFieldUpdater<ClientLinkType> CLOSED_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(ClientLinkType.class, "closed");
+
+    protected final ClientFuture<LinkType> openFuture;
+    protected final ClientFuture<LinkType> closeFuture;
+
+    protected volatile int closed;
+    protected ClientException failureCause;
+
+    protected final ClientSession session;
+    protected final ScheduledExecutorService executor;
+    protected final String linkId;
+    protected final LinkOptions<?> options;
+
+    protected volatile Source remoteSource;
+    protected volatile Target remoteTarget;
+
+    protected Consumer<LinkType> linkRemotelyClosedHandler;
+
+    ClientLinkType(ClientSession session, String linkId, LinkOptions<?> options) {
+        this.session = session;
+        this.linkId = linkId;
+        this.options = options;
+        this.executor = session.getScheduler();
+        this.openFuture = session.getFutureFactory().createFuture();
+        this.closeFuture = session.getFutureFactory().createFuture();
+    }
+
+    protected abstract LinkType self();
+
+    protected abstract ProtonType protonLink();
+
+    @Override
+    public void close() {
+        try {
+            doCloseOrDetach(true, null).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Thread.interrupted();
+        }
+    }
+
+    @Override
+    public void close(ErrorCondition error) {
+        Objects.requireNonNull(error, "Error Condition cannot be null");
+
+        try {
+            doCloseOrDetach(true, error).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Thread.interrupted();
+        }
+    }
+
+    @Override
+    public void detach() {
+        try {
+            doCloseOrDetach(false, null).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Thread.interrupted();
+        }
+    }
+
+    @Override
+    public void detach(ErrorCondition error) {
+        Objects.requireNonNull(error, "Error Condition cannot be null");
+
+        try {
+            doCloseOrDetach(false, error).get();
+        } catch (InterruptedException | ExecutionException e) {
+            Thread.interrupted();
+        }
+    }
+
+    @Override
+    public ClientFuture<LinkType> closeAsync() {
+        return doCloseOrDetach(true, null);
+    }
+
+    @Override
+    public ClientFuture<LinkType> closeAsync(ErrorCondition error) {
+        Objects.requireNonNull(error, "Error Condition cannot be null");
+
+        return doCloseOrDetach(true, error);
+    }
+
+    @Override
+    public ClientFuture<LinkType> detachAsync() {
+        return doCloseOrDetach(false, null);
+    }
+
+    @Override
+    public ClientFuture<LinkType> detachAsync(ErrorCondition error) {
+        Objects.requireNonNull(error, "Error Condition cannot be null");
+
+        return doCloseOrDetach(false, error);
+    }
+
+    private ClientFuture<LinkType> doCloseOrDetach(boolean close, ErrorCondition error) {
+        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
+            // Already closed by failure or shutdown so no need to queue task
+            if (!closeFuture.isDone()) {
+                executor.execute(() -> {
+                    if (protonLink().isLocallyOpen()) {
+                        try {
+                            protonLink().setCondition(ClientErrorCondition.asProtonErrorCondition(error));
+
+                            if (close) {
+                                protonLink().close();
+                            } else {
+                                protonLink().detach();
+                            }
+                        } catch (Throwable ignore) {
+                            closeFuture.complete(self());
+                        }
+                    }
+                });
+            }
+        }
+        return closeFuture;
+    }
+
+    @Override
+    public String address() throws ClientException {
+        if (protonLink().isSender()) {
+            final org.apache.qpid.protonj2.types.messaging.Target target;
+            if (isDynamic()) {
+                waitForOpenToComplete();
+                target = protonLink().getRemoteTarget();
+            } else {
+                target = protonLink().getTarget();
+            }
+
+            if (target != null) {
+                return target.getAddress();
+            } else {
+                return null;
+            }
+        } else {
+            if (isDynamic()) {
+                waitForOpenToComplete();
+                return protonLink().getRemoteSource().getAddress();
+            } else {
+                return protonLink().getSource() != null ? protonLink().getSource().getAddress() : null;
+            }
+        }
+    }
+
+    @Override
+    public Source source() throws ClientException {
+        waitForOpenToComplete();
+        return remoteSource;
+    }
+
+    @Override
+    public Target target() throws ClientException {
+        waitForOpenToComplete();
+        return remoteTarget;
+    }
+
+    @Override
+    public Map<String, Object> properties() throws ClientException {
+        waitForOpenToComplete();
+        return ClientConversionSupport.toStringKeyedMap(protonLink().getRemoteProperties());
+    }
+
+    @Override
+    public String[] offeredCapabilities() throws ClientException {
+        waitForOpenToComplete();
+        return ClientConversionSupport.toStringArray(protonLink().getRemoteOfferedCapabilities());
+    }
+
+    @Override
+    public String[] desiredCapabilities() throws ClientException {
+        waitForOpenToComplete();
+        return ClientConversionSupport.toStringArray(protonLink().getRemoteDesiredCapabilities());
+    }
+
+    @Override
+    public ClientInstance client() {
+        return session.client();
+    }
+
+    @Override
+    public ClientConnection connection() {
+        return session.connection();
+    }
+
+    @Override
+    public ClientSession session() {
+        return session;
+    }
+
+    @Override
+    public ClientFuture<LinkType> openFuture() {
+        return openFuture;
+    }
+
+    LinkType remotelyClosedHandler(Consumer<LinkType> handler) {
+        this.linkRemotelyClosedHandler = handler;
+        return self();
+    }
+
+    String getId() {
+        return linkId;
+    }
+
+    void setFailureCause(ClientException failureCause) {
+        this.failureCause = failureCause;
+    }
+
+    ClientException getFailureCause() {
+        if (failureCause == null) {
+            return session.getFailureCause();
+        } else {
+            return failureCause;
+        }
+    }
+
+    boolean isClosed() {
+        return closed > 0;
+    }
+
+    boolean isDynamic() {
+        if (protonLink().isSender()) {
+            return protonLink().getTarget() != null && protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().isDynamic();
+        } else {
+            return protonLink().getSource() != null && protonLink().getSource().isDynamic();
+        }
+    }
+
+    final LinkType open() {
+        protonLink().localOpenHandler(this::handleLocalOpen)
+                    .localCloseHandler(this::handleLocalCloseOrDetach)
+                    .localDetachHandler(this::handleLocalCloseOrDetach)
+                    .openHandler(this::handleRemoteOpen)
+                    .closeHandler(this::handleRemoteCloseOrDetach)
+                    .detachHandler(this::handleRemoteCloseOrDetach)
+                    .parentEndpointClosedHandler(this::handleParentEndpointClosed)
+                    .engineShutdownHandler(this::handleEngineShutdown);
+
+        protonLink().open();
+
+        return self();
+    }
+
+    //----- Generic link state change handlers
+
+    protected final void handleLocalOpen(ProtonType link) {
+        linkSpecificLocalOpenHandler();
+
+        if (options.openTimeout() > 0) {
+            executor.schedule(() -> {
+                if (!openFuture.isDone()) {
+                    immediateLinkShutdown(new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
+                }
+            }, options.openTimeout(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    protected final void handleLocalCloseOrDetach(ProtonType link) {
+        linkSpecificLocalCloseHandler();
+
+        // If not yet remotely closed we only wait for a remote close if the engine isn't
+        // already failed and we have successfully opened the sender without a timeout.
+        if (!link.getEngine().isShutdown() && failureCause == null && link.isRemotelyOpen()) {
+            final long timeout = options.closeTimeout();
+
+            if (timeout > 0) {
+                session.scheduleRequestTimeout(closeFuture, timeout, () ->
+                    new ClientOperationTimedOutException("Link close timed out waiting for remote to respond"));
+            }
+        } else {
+            immediateLinkShutdown(failureCause);
+        }
+    }
+
+    protected final void handleRemoteOpen(ProtonType link) {
+        // Check for deferred close pending and hold completion if so
+        if ((link.isSender() && link.getRemoteTarget() != null) ||
+            (link.isReceiver() && link.getRemoteSource() != null)) {
+
+            if (link.getRemoteSource() != null) {
+                remoteSource = new ClientRemoteSource(link.getRemoteSource());
+            }
+
+            if (link.getRemoteTarget() != null) {
+                remoteTarget = new ClientRemoteTarget(link.getRemoteTarget());
+            }
+
+            linkSpecificRemoteOpenHandler();
+
+            openFuture.complete(self());
+            LOG.trace("Link opened successfully: {}", link);
+        } else {
+            LOG.debug("Link opened but remote signaled close is pending: {}", link);
+        }
+    }
+
+    protected final void handleRemoteCloseOrDetach(ProtonType link) {
+        linkSpecificRemoteCloseHandler();
+
+        if (link.isLocallyOpen()) {
+            if (linkRemotelyClosedHandler != null) {
+                try {
+                    linkRemotelyClosedHandler.accept(self());
+                } catch (Throwable ignore) {}
+            }
+
+            immediateLinkShutdown(ClientExceptionSupport.convertToLinkClosedException(
+                link.getRemoteCondition(), "Link remotely closed without explanation from the remote"));
+        } else {
+            immediateLinkShutdown(failureCause);
+        }
+    }
+
+    protected final void handleParentEndpointClosed(ProtonType link) {
+        // Don't react if engine was shutdown and parent closed as a result instead wait to get the
+        // shutdown notification and respond to that change.
+        if (link.getEngine().isRunning()) {
+            final ClientException failureCause;
+
+            if (link.getConnection().getRemoteCondition() != null) {
+                failureCause = ClientExceptionSupport.convertToConnectionClosedException(link.getConnection().getRemoteCondition());
+            } else if (link.getSession().getRemoteCondition() != null) {
+                failureCause = ClientExceptionSupport.convertToSessionClosedException(link.getSession().getRemoteCondition());
+            } else if (link.getEngine().failureCause() != null) {
+                failureCause = ClientExceptionSupport.convertToConnectionClosedException(link.getEngine().failureCause());
+            } else if (!isClosed()) {
+                failureCause = new ClientResourceRemotelyClosedException("Remote closed without a specific error condition");
+            } else {
+                failureCause = null;
+            }
+
+            immediateLinkShutdown(failureCause);
+        }
+    }
+
+    protected final void handleEngineShutdown(Engine engine) {
+        if (!isDynamic() && !session.getConnection().getEngine().isShutdown()) {
+            recreateLinkForReconnect();
+            open();
+        } else {
+            final Connection connection = engine.connection();
+
+            final ClientException failureCause;
+
+            if (connection.getRemoteCondition() != null) {
+                failureCause = ClientExceptionSupport.convertToConnectionClosedException(connection.getRemoteCondition());
+            } else if (engine.failureCause() != null) {
+                failureCause = ClientExceptionSupport.convertToConnectionClosedException(engine.failureCause());
+            } else if (!isClosed()) {
+                failureCause = new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition");
+            } else {
+                failureCause = null;
+            }
+
+            immediateLinkShutdown(failureCause);
+        }
+    }
+
+    protected final void immediateLinkShutdown(ClientException failureCause) {
+        if (this.failureCause == null) {
+            this.failureCause = failureCause;
+        }
+
+        try {
+            if (protonLink().isRemotelyDetached()) {
+                protonLink().detach();
+            } else {
+                protonLink().close();
+            }
+        } catch (Throwable ignore) {
+            // Ignore
+        }
+
+        try {
+            linkSpecificCleanupHandler(this.failureCause);
+        } catch (Exception ex) {
+            // Ignore for now, possibly log if it becomes needed
+        } finally {
+            if (failureCause != null) {
+                openFuture.failed(failureCause);
+            } else {
+                openFuture.complete(self());
+            }
+
+            closeFuture.complete(self());
+        }
+    }
+
+    //----- Abstract API for implementations to override
+
+    protected abstract void linkSpecificLocalOpenHandler();
+
+    protected abstract void linkSpecificLocalCloseHandler();
+
+    protected abstract void linkSpecificRemoteOpenHandler();
+
+    protected abstract void linkSpecificRemoteCloseHandler();
+
+    protected abstract void linkSpecificCleanupHandler(ClientException failureCause);
+
+    protected abstract void recreateLinkForReconnect();
+
+    //----- Internal API for link implementations
+
+    protected boolean notClosedOrFailed(ClientFuture<?> request) {
+        return notClosedOrFailed(request, protonLink());
+    }
+
+    protected boolean notClosedOrFailed(ClientFuture<?> request, ProtonType protonLink) {
+        if (isClosed()) {
+            request.failed(new ClientIllegalStateException("The Sender was explicitly closed", failureCause));
+            return false;
+        } else if (failureCause != null) {
+            request.failed(failureCause);
+            return false;
+        } else if (protonLink.isLocallyClosedOrDetached()) {
+            if (protonLink.getConnection().getRemoteCondition() != null) {
+                request.failed(ClientExceptionSupport.convertToConnectionClosedException(protonLink.getConnection().getRemoteCondition()));
+            } else if (protonLink.getSession().getRemoteCondition() != null) {
+                request.failed(ClientExceptionSupport.convertToSessionClosedException(protonLink.getSession().getRemoteCondition()));
+            } else if (protonLink.getEngine().failureCause() != null) {
+                request.failed(ClientExceptionSupport.convertToConnectionClosedException(protonLink.getEngine().failureCause()));
+            } else {
+                request.failed(new ClientIllegalStateException(
+                    String.format("{} closed without a specific error condition", protonLink.isSender() ? "Sender" : "Receiver")));
+            }
+            return false;
+        } else {
+            return true;
+        }
+    }
+
+    protected void checkClosedOrFailed() throws ClientException {
+        if (isClosed()) {
+            throw new ClientIllegalStateException("The Sender was explicitly closed", failureCause);
+        } else if (failureCause != null) {
+            throw failureCause;
+        }
+    }
+
+    protected void waitForOpenToComplete() throws ClientException {
+        if (!openFuture.isComplete() || openFuture.isFailed()) {
+            try {
+                openFuture.get();
+            } catch (ExecutionException | InterruptedException e) {
+                Thread.interrupted();
+                if (failureCause != null) {
+                    throw failureCause;
+                } else {
+                    throw ClientExceptionSupport.createNonFatalOrPassthrough(e.getCause());
+                }
+            }
+        }
+    }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java
index f8933fc6..319f310d 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java
@@ -177,19 +177,19 @@ final class ClientLocalTransactionContext implements ClientTransactionContext {
     }
 
     @Override
-    public ClientTransactionContext send(ClientOutgoingEnvelope envelope, DeliveryState outcome, boolean settled) {
+    public ClientTransactionContext send(Sendable sendable, DeliveryState outcome, boolean settled) {
         if (isInTransaction()) {
             if (isRollbackOnly()) {
-                envelope.discard();
+                sendable.discard();
             } else if (outcome == null) {
                 DeliveryState txnOutcome = cachedSenderOutcome != null ?
                     cachedSenderOutcome : (cachedSenderOutcome = new TransactionalState().setTxnId(currentTxn.getTxnId()));
-                envelope.sendPayload(txnOutcome, settled);
+                sendable.send(txnOutcome, settled);
             } else {
-                envelope.sendPayload(new TransactionalState().setTxnId(currentTxn.getTxnId()).setOutcome((Outcome) outcome), settled);
+                sendable.send(new TransactionalState().setTxnId(currentTxn.getTxnId()).setOutcome((Outcome) outcome), settled);
             }
         } else {
-            envelope.sendPayload(outcome, settled);
+            sendable.send(outcome, settled);
         }
 
         return this;
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
index 28151f56..493f475f 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpStreamTracker.java
@@ -16,36 +16,47 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.client.DeliveryState;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamTracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.futures.ClientFutureFactory;
 
 /**
  * A dummy Tracker instance that always indicates remote settlement and
  * acceptance for {@link StreamSender} instances.
  */
-public class ClientNoOpStreamTracker extends ClientNoOpTracker implements StreamTracker {
+public class ClientNoOpStreamTracker implements StreamTracker {
+
+    private final ClientStreamSender sender;
+
+    private DeliveryState state;
+    private boolean settled;
 
     ClientNoOpStreamTracker(ClientStreamSender sender) {
-        super(sender);
+        this.sender = sender;
     }
 
     @Override
     public StreamSender sender() {
-        return (StreamSender) super.sender();
+        return sender;
     }
 
     @Override
     public StreamTracker settle() throws ClientException {
-        return (StreamTracker) super.settle();
+        this.settled = true;
+        return this;
     }
 
     @Override
     public StreamTracker disposition(DeliveryState state, boolean settle) throws ClientException {
-        return (StreamTracker) super.disposition(state, settle);
+        this.state = state;
+        this.settled = settle;
+
+        return this;
     }
 
     @Override
@@ -57,4 +68,39 @@ public class ClientNoOpStreamTracker extends ClientNoOpTracker implements Stream
     public StreamTracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
         return this;
     }
+
+    @Override
+    public boolean settled() {
+        return settled;
+    }
+
+    @Override
+    public DeliveryState state() {
+        return state;
+    }
+
+    @Override
+    public DeliveryState remoteState() {
+        return ClientDeliveryState.ClientAccepted.getInstance();
+    }
+
+    @Override
+    public boolean remoteSettled() {
+        return true;
+    }
+
+    @Override
+    public Future<StreamTracker> settlementFuture() {
+        return ClientFutureFactory.completedFuture(this);
+    }
+
+    @Override
+    public StreamTracker awaitAccepted() throws ClientException {
+        return this;
+    }
+
+    @Override
+    public StreamTracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
+        return this;
+    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTransactionContext.java
index ca4aeada..df5773f6 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTransactionContext.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientNoOpTransactionContext.java
@@ -40,7 +40,7 @@ final class ClientNoOpTransactionContext implements ClientTransactionContext {
 
     @Override
     public ClientTransactionContext rollback(ClientFuture<Session> rollbackFuture, boolean startNew) throws ClientIllegalStateException {
-        throw new ClientIllegalStateException("Cannot rollback from a no-op transaction context");
+        throw new ClientIllegalStateException("Cannot roll back from a no-op transaction context");
     }
 
     @Override
@@ -54,8 +54,8 @@ final class ClientNoOpTransactionContext implements ClientTransactionContext {
     }
 
     @Override
-    public ClientTransactionContext send(ClientOutgoingEnvelope envelope, DeliveryState outcome, boolean settled) {
-        envelope.sendPayload(outcome, settled);
+    public ClientTransactionContext send(Sendable sendable, DeliveryState outcome, boolean settled) {
+        sendable.send(outcome, settled);
         return this;
     }
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientOutgoingEnvelope.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientOutgoingEnvelope.java
deleted file mode 100644
index 2c022d66..00000000
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientOutgoingEnvelope.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.protonj2.client.impl;
-
-import java.util.concurrent.ScheduledFuture;
-
-import org.apache.qpid.protonj2.buffer.ProtonBuffer;
-import org.apache.qpid.protonj2.client.Tracker;
-import org.apache.qpid.protonj2.client.exceptions.ClientException;
-import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
-import org.apache.qpid.protonj2.client.futures.ClientFuture;
-import org.apache.qpid.protonj2.engine.OutgoingDelivery;
-import org.apache.qpid.protonj2.types.transport.DeliveryState;
-
-/**
- * Tracking object used to manage the life-cycle of a send of message payload
- * to the remote which can be stalled either for link or session credit limits.
- * The envelope carries sufficient information to write payload bytes as credit
- * is available.
- */
-public class ClientOutgoingEnvelope {
-
-    private final ProtonBuffer payload;
-    private final ClientFuture<Tracker> request;
-    private final ClientSender sender;
-    private final boolean complete;
-    private final int messageFormat;
-
-    private boolean aborted;
-    private ScheduledFuture<?> sendTimeout;
-    private OutgoingDelivery delivery;
-
-    /**
-     * Create a new In-flight Send instance for a complete message send.  No further
-     * sends can occur after the send completes.
-     *
-     * @param sender
-     *      The {@link ClientSender} instance that is attempting to send this encoded message.
-     * @param messageFormat
-     *      The message format code to assign the send if this is the first delivery.
-     * @param payload
-     *      The payload that comprises this portion of the send.
-     * @param request
-     *      The requesting operation that initiated this send.
-     */
-    ClientOutgoingEnvelope(ClientSender sender, int messageFormat, ProtonBuffer payload, ClientFuture<Tracker> request) {
-        this.messageFormat = messageFormat;
-        this.payload = payload;
-        this.request = request;
-        this.sender = sender;
-        this.complete = true;
-    }
-
-    /**
-     * Create a new In-flight Send instance.
-     *
-     * @param sender
-     *      The {@link ClientSender} instance that is attempting to send this encoded message.
-     * @param messageFormat
-     *      The message format code to assign the send if this is the first delivery.
-     * @param payload
-     *      The payload that comprises this portion of the send.
-     * @param complete
-     *      Indicates if the encoded payload represents the complete transfer or if more is coming.
-     * @param request
-     *      The requesting operation that initiated this send.
-     */
-    ClientOutgoingEnvelope(ClientSender sender, int messageFormat, ProtonBuffer payload, boolean complete, ClientFuture<Tracker> request) {
-        this.payload = payload;
-        this.request = request;
-        this.sender = sender;
-        this.complete = complete;
-        this.messageFormat = messageFormat;
-    }
-
-    /**
-     * Create a new In-flight Send instance that is a continuation on an existing delivery.
-     *
-     * @param sender
-     *      The {@link ClientSender} instance that is attempting to send this encoded message.
-     * @param messageFormat
-     *      The message format code to assign the send if this is the first delivery.
-     * @param delivery
-     *      The {@link OutgoingDelivery} context this envelope will be added to.
-     * @param payload
-     *      The payload that comprises this portion of the send.
-     * @param complete
-     *      Indicates if the encoded payload represents the complete transfer or if more is coming.
-     * @param request
-     *      The requesting operation that initiated this send.
-     */
-    public ClientOutgoingEnvelope(ClientSender sender, OutgoingDelivery delivery, int messageFormat, ProtonBuffer payload, boolean complete, ClientFuture<Tracker> request) {
-        this.payload = payload;
-        this.request = request;
-        this.sender = sender;
-        this.complete = complete;
-        this.messageFormat = messageFormat;
-        this.delivery = delivery;
-    }
-
-    /**
-     * @return the {@link ScheduledFuture} used to determine when the send should fail if no credit available to write.
-     */
-    public ScheduledFuture<?> sendTimeout() {
-        return sendTimeout;
-    }
-
-    /**
-     * Sets the {@link ScheduledFuture} which should be used when a send cannot be immediately performed.
-     *
-     * @param sendTimeout
-     * 		The {@link ScheduledFuture} that will fail the send if not cancelled once it has been performed.
-     */
-    public void sendTimeout(ScheduledFuture<?> sendTimeout) {
-        this.sendTimeout = sendTimeout;
-    }
-
-    public ProtonBuffer payload() {
-        return payload;
-    }
-
-    public OutgoingDelivery delivery() {
-        return delivery;
-    }
-
-    public ClientOutgoingEnvelope abort() {
-        this.aborted = true;
-        return this;
-    }
-
-    public ClientSender sender() {
-        return sender;
-    }
-
-    public boolean aborted() {
-        return aborted;
-    }
-
-    public ClientOutgoingEnvelope discard() {
-        if (sendTimeout != null) {
-            sendTimeout.cancel(true);
-            sendTimeout = null;
-        }
-
-        if (delivery != null) {
-            ClientTracker tracker = delivery.getLinkedResource();
-            if (tracker != null) {
-                tracker.settlementFuture().complete(tracker);
-            }
-            request.complete(delivery.getLinkedResource());
-        } else {
-            request.complete(sender.createNoOpTracker());
-        }
-
-        return this;
-    }
-
-    public ClientOutgoingEnvelope succeeded() {
-        if (sendTimeout != null) {
-            sendTimeout.cancel(true);
-        }
-
-        request.complete(delivery.getLinkedResource());
-
-        return this;
-    }
-
-    public ClientOutgoingEnvelope failed(ClientException exception) {
-        if (sendTimeout != null) {
-            sendTimeout.cancel(true);
-        }
-
-        request.failed(exception);
-
-        return this;
-    }
-
-    public void sendPayload(DeliveryState state, boolean settled) {
-        if (delivery == null) {
-            delivery = sender.getProtonSender().next();
-            delivery.setLinkedResource(sender.createTracker(delivery));
-        }
-
-        if (delivery.getTransferCount() == 0) {
-            delivery.setMessageFormat(messageFormat);
-            delivery.disposition(state, settled);
-        }
-
-        // We must check if the delivery was fully written and then complete the send operation otherwise
-        // if the session capacity limited the amount of payload data we need to hold the completion until
-        // the session capacity is refilled and we can fully write the remaining message payload.  This
-        // area could use some enhancement to allow control of write and flush when dealing with delivery
-        // modes that have low assurance versus those that are strict.
-        if (aborted()) {
-            delivery.abort();
-            succeeded();
-        } else {
-            sender.connection().autoFlushOff();
-            try {
-                delivery.streamBytes(payload, complete);
-                if (payload != null && payload.isReadable()) {
-                    sender.addToHeadOfBlockedQueue(this);
-                } else {
-                    succeeded();
-                }
-                sender.connection().flush();
-            } finally {
-                sender.connection().autoFlushOn();
-            }
-        }
-    }
-
-    public ClientException createSendTimedOutException() {
-        return new ClientSendTimedOutException("Timed out waiting for credit to send");
-    }
-}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
index f3dd627c..b13e1fd3 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiver.java
@@ -16,30 +16,20 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.qpid.protonj2.client.Delivery;
-import org.apache.qpid.protonj2.client.ErrorCondition;
 import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
-import org.apache.qpid.protonj2.client.Source;
-import org.apache.qpid.protonj2.client.Target;
-import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
 import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.client.util.FifoDeliveryQueue;
-import org.apache.qpid.protonj2.engine.Connection;
-import org.apache.qpid.protonj2.engine.Engine;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.types.messaging.Accepted;
 import org.apache.qpid.protonj2.types.messaging.Released;
@@ -50,38 +40,24 @@ import org.slf4j.LoggerFactory;
 /**
  * Client {@link Receiver} implementation.
  */
-public final class ClientReceiver implements Receiver {
+public final class ClientReceiver extends ClientLinkType<Receiver, org.apache.qpid.protonj2.engine.Receiver> implements Receiver {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
 
-    private static final AtomicIntegerFieldUpdater<ClientReceiver> CLOSED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ClientReceiver.class, "closed");
-
-    private final ClientFuture<Receiver> openFuture;
-    private final ClientFuture<Receiver> closeFuture;
     private ClientFuture<Receiver> drainingFuture;
     private ScheduledFuture<?> drainingTimeout;
 
     private final ReceiverOptions options;
-    private final ClientSession session;
     private final ScheduledExecutorService executor;
-    private final String receiverId;
     private final FifoDeliveryQueue messageQueue;
-    private volatile int closed;
-    private ClientException failureCause;
 
     private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
 
-    private volatile Source remoteSource;
-    private volatile Target remoteTarget;
-
     ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
+        super(session, receiverId, options);
+
         this.options = options;
-        this.session = session;
-        this.receiverId = receiverId;
         this.executor = session.getScheduler();
-        this.openFuture = session.getFutureFactory().createFuture();
-        this.closeFuture = session.getFutureFactory().createFuture();
         this.protonReceiver = receiver.setLinkedResource(this);
 
         if (options.creditWindow() > 0) {
@@ -92,48 +68,6 @@ public final class ClientReceiver implements Receiver {
         messageQueue.start();
     }
 
-    @Override
-    public String address() throws ClientException {
-        if (isDynamic()) {
-            waitForOpenToComplete();
-            return protonReceiver.getRemoteSource().getAddress();
-        } else {
-            return protonReceiver.getSource() != null ? protonReceiver.getSource().getAddress() : null;
-        }
-    }
-
-    @Override
-    public Source source() throws ClientException {
-        waitForOpenToComplete();
-        return remoteSource;
-    }
-
-    @Override
-    public Target target() throws ClientException {
-        waitForOpenToComplete();
-        return remoteTarget;
-    }
-
-    @Override
-    public ClientInstance client() {
-        return session.client();
-    }
-
-    @Override
-    public ClientConnection connection() {
-        return session.connection();
-    }
-
-    @Override
-    public ClientSession session() {
-        return session;
-    }
-
-    @Override
-    public Future<Receiver> openFuture() {
-        return openFuture;
-    }
-
     @Override
     public Delivery receive() throws ClientException {
         return receive(-1, TimeUnit.MILLISECONDS);
@@ -182,95 +116,6 @@ public final class ClientReceiver implements Receiver {
         return delivery;
     }
 
-    @Override
-    public void close() {
-        try {
-            doCloseOrDetach(true, null).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void close(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        try {
-            doCloseOrDetach(true, error).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void detach() {
-        try {
-            doCloseOrDetach(false, null).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void detach(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        try {
-            doCloseOrDetach(false, error).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public ClientFuture<Receiver> closeAsync() {
-        return doCloseOrDetach(true, null);
-    }
-
-    @Override
-    public ClientFuture<Receiver> closeAsync(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        return doCloseOrDetach(true, error);
-    }
-
-    @Override
-    public ClientFuture<Receiver> detachAsync() {
-        return doCloseOrDetach(false, null);
-    }
-
-    @Override
-    public ClientFuture<Receiver> detachAsync(ErrorCondition error) {
-        Objects.requireNonNull(error, "The provided Error Condition cannot be null");
-
-        return doCloseOrDetach(false, error);
-    }
-
-    private ClientFuture<Receiver> doCloseOrDetach(boolean close, ErrorCondition error) {
-        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
-            // Already closed by failure or shutdown so no need to queue task
-            if (!closeFuture.isDone()) {
-                executor.execute(() -> {
-                    if (protonReceiver.isLocallyOpen()) {
-                        try {
-                            protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
-                            if (close) {
-                                protonReceiver.close();
-                            } else {
-                                protonReceiver.detach();
-                            }
-                        } catch (Throwable ignore) {
-                            closeFuture.complete(this);
-                        }
-                    }
-                });
-            }
-        }
-
-        return closeFuture;
-    }
-
     @Override
     public long queuedDeliveries() {
         return messageQueue.size();
@@ -330,24 +175,6 @@ public final class ClientReceiver implements Receiver {
         return drainComplete;
     }
 
-    @Override
-    public Map<String, Object> properties() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringKeyedMap(protonReceiver.getRemoteProperties());
-    }
-
-    @Override
-    public String[] offeredCapabilities() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringArray(protonReceiver.getRemoteOfferedCapabilities());
-    }
-
-    @Override
-    public String[] desiredCapabilities() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringArray(protonReceiver.getRemoteDesiredCapabilities());
-    }
-
     //----- Internal API for the ClientReceiver and other Client objects
 
     void disposition(IncomingDelivery delivery, DeliveryState state, boolean settle) throws ClientException {
@@ -355,166 +182,22 @@ public final class ClientReceiver implements Receiver {
         asyncApplyDisposition(delivery, state, settle);
     }
 
-    ClientReceiver open() {
-        protonReceiver.localOpenHandler(this::handleLocalOpen)
-                      .localCloseHandler(this::handleLocalCloseOrDetach)
-                      .localDetachHandler(this::handleLocalCloseOrDetach)
-                      .openHandler(this::handleRemoteOpen)
-                      .closeHandler(this::handleRemoteCloseOrDetach)
-                      .detachHandler(this::handleRemoteCloseOrDetach)
-                      .parentEndpointClosedHandler(this::handleParentEndpointClosed)
-                      .deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
-                      .deliveryReadHandler(this::handleDeliveryReceived)
-                      .deliveryAbortedHandler(this::handleDeliveryAborted)
-                      .creditStateUpdateHandler(this::handleReceiverCreditUpdated)
-                      .engineShutdownHandler(this::handleEngineShutdown)
-                      .open();
-
-        return this;
-    }
-
-    void setFailureCause(ClientException failureCause) {
-        this.failureCause = failureCause;
-    }
-
-    ClientException getFailureCause() {
-        if (failureCause == null) {
-            return session.getFailureCause();
-        } else {
-            return failureCause;
-        }
-    }
-
-    String getId() {
-        return receiverId;
-    }
-
-    boolean isClosed() {
-        return closed > 0;
-    }
-
+    @Override
     boolean isDynamic() {
         return protonReceiver.getSource() != null && protonReceiver.getSource().isDynamic();
     }
 
-    //----- Handlers for proton receiver events
-
-    private void handleLocalOpen(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        if (options.openTimeout() > 0) {
-            executor.schedule(() -> {
-                if (!openFuture.isDone()) {
-                    immediateLinkShutdown(new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
-                }
-            }, options.openTimeout(), TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private void handleLocalCloseOrDetach(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        messageQueue.stop();  // Ensure blocked receivers are all unblocked.
-
-        // If not yet remotely closed we only wait for a remote close if the engine isn't
-        // already failed and we have successfully opened the sender without a timeout.
-        if (!receiver.getEngine().isShutdown() && failureCause == null && receiver.isRemotelyOpen()) {
-            final long timeout = options.closeTimeout();
-
-            if (timeout > 0) {
-                session.scheduleRequestTimeout(closeFuture, timeout, () ->
-                new ClientOperationTimedOutException("receiver close timed out waiting for remote to respond"));
-            }
-        } else {
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
-    private void handleRemoteOpen(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        // Check for deferred close pending and hold completion if so
-        if (receiver.getRemoteSource() != null) {
-            remoteSource = new ClientRemoteSource(receiver.getRemoteSource());
-
-            if (receiver.getRemoteTarget() != null) {
-                remoteTarget = new ClientRemoteTarget(receiver.getRemoteTarget());
-            }
-
-            replenishCreditIfNeeded();
-
-            openFuture.complete(this);
-            LOG.trace("Receiver opened successfully: {}", receiverId);
-        } else {
-            LOG.debug("Receiver opened but remote signalled close is pending: {}", receiverId);
-        }
-    }
-
-    private void handleRemoteCloseOrDetach(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        if (receiver.isLocallyOpen()) {
-            immediateLinkShutdown(ClientExceptionSupport.convertToLinkClosedException(
-                receiver.getRemoteCondition(), "Receiver remotely closed without explanation from the remote"));
-        } else {
-            immediateLinkShutdown(failureCause);
-        }
+    @Override
+    protected Receiver self() {
+        return this;
     }
 
-    private void handleParentEndpointClosed(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        // Don't react if engine was shutdown and parent closed as a result instead wait to get the
-        // shutdown notification and respond to that change.
-        if (receiver.getEngine().isRunning()) {
-            final ClientException failureCause;
-
-            if (receiver.getConnection().getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(receiver.getConnection().getRemoteCondition());
-            } else if (receiver.getSession().getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToSessionClosedException(receiver.getSession().getRemoteCondition());
-            } else if (receiver.getEngine().failureCause() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(receiver.getEngine().failureCause());
-            } else if (!isClosed()) {
-                failureCause = new ClientResourceRemotelyClosedException("Remote closed without a specific error condition");
-            } else {
-                failureCause = null;
-            }
-
-            immediateLinkShutdown(failureCause);
-        }
+    @Override
+    protected org.apache.qpid.protonj2.engine.Receiver protonLink() {
+        return protonReceiver;
     }
 
-    private void handleEngineShutdown(Engine engine) {
-        if (!isDynamic() && !session.getConnection().getEngine().isShutdown()) {
-            int previousCredit = protonReceiver.getCredit() + messageQueue.size();
-
-            messageQueue.clear();  // Prefetched messages should be discarded.
-
-            if (drainingFuture != null) {
-                drainingFuture.complete(this);
-                if (drainingTimeout != null) {
-                    drainingTimeout.cancel(false);
-                    drainingTimeout = null;
-                }
-            }
-
-            protonReceiver.localCloseHandler(null);
-            protonReceiver.localDetachHandler(null);
-            protonReceiver.close();
-            protonReceiver = ClientReceiverBuilder.recreateReceiver(session, protonReceiver, options);
-            protonReceiver.setLinkedResource(this);
-            protonReceiver.addCredit(previousCredit);
-
-            open();
-        } else {
-            final Connection connection = engine.connection();
-
-            final ClientException failureCause;
-
-            if (connection.getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(connection.getRemoteCondition());
-            } else if (engine.failureCause() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(engine.failureCause());
-            } else if (!isClosed()) {
-                failureCause = new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition");
-            } else {
-                failureCause = null;
-            }
-
-            immediateLinkShutdown(failureCause);
-        }
-    }
+    //----- Handlers for proton receiver events
 
     private void handleDeliveryReceived(IncomingDelivery delivery) {
         LOG.trace("Delivery data was received: {}", delivery);
@@ -592,72 +275,58 @@ public final class ClientReceiver implements Receiver {
         }
     }
 
-    private void waitForOpenToComplete() throws ClientException {
-        if (!openFuture.isComplete() || openFuture.isFailed()) {
-            try {
-                openFuture.get();
-            } catch (ExecutionException | InterruptedException e) {
-                Thread.interrupted();
-                if (failureCause != null) {
-                    throw failureCause;
-                } else {
-                    throw ClientExceptionSupport.createNonFatalOrPassthrough(e.getCause());
-                }
-            }
-        }
-    }
-
-    private boolean notClosedOrFailed(ClientFuture<?> request) {
-        if (isClosed()) {
-            request.failed(new ClientIllegalStateException("The Receiver was explicitly closed", failureCause));
-            return false;
-        } else if (failureCause != null) {
-            request.failed(failureCause);
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    private void checkClosedOrFailed() throws ClientException {
-        if (isClosed()) {
-            throw new ClientIllegalStateException("The Receiver was explicitly closed", failureCause);
-        } else if (failureCause != null) {
-            throw failureCause;
-        }
-    }
+    @Override
+    protected void recreateLinkForReconnect() {
+        int previousCredit = protonReceiver.getCredit() + messageQueue.size();
 
-    private void immediateLinkShutdown(ClientException failureCause) {
-        if (this.failureCause == null) {
-            this.failureCause = failureCause;
-        }
+        messageQueue.clear();  // Prefetched messages should be discarded.
 
-        try {
-            if (protonReceiver.isRemotelyDetached()) {
-                protonReceiver.detach();
-            } else {
-                protonReceiver.close();
+        if (drainingFuture != null) {
+            drainingFuture.complete(this);
+            if (drainingTimeout != null) {
+                drainingTimeout.cancel(false);
+                drainingTimeout = null;
             }
-        } catch (Exception ignore) {
         }
 
-        if (failureCause != null) {
-            openFuture.failed(failureCause);
-            if (drainingFuture != null) {
-                drainingFuture.failed(failureCause);
-            }
-        } else {
-            openFuture.complete(this);
-            if (drainingFuture != null) {
-                drainingFuture.failed(new ClientResourceRemotelyClosedException("The Receiver has been closed"));
-            }
-        }
+        protonReceiver.localCloseHandler(null);
+        protonReceiver.localDetachHandler(null);
+        protonReceiver.close();
+        protonReceiver = ClientReceiverBuilder.recreateReceiver(session, protonReceiver, options);
+        protonReceiver.setLinkedResource(this);
+        protonReceiver.addCredit(previousCredit);
+    }
 
+    @Override
+    protected void linkSpecificCleanupHandler(ClientException failureCause) {
         if (drainingTimeout != null) {
+            drainingFuture.failed(
+                failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
             drainingTimeout.cancel(false);
             drainingTimeout = null;
         }
+    }
 
-        closeFuture.complete(this);
+    @Override
+    protected void linkSpecificLocalOpenHandler() {
+        protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
+                      .deliveryReadHandler(this::handleDeliveryReceived)
+                      .deliveryAbortedHandler(this::handleDeliveryAborted)
+                      .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
+    }
+
+    @Override
+    protected void linkSpecificLocalCloseHandler() {
+        messageQueue.stop();  // Ensure blocked receivers are all unblocked.
+    }
+
+    @Override
+    protected void linkSpecificRemoteOpenHandler() {
+        replenishCreditIfNeeded();
+    }
+
+    @Override
+    protected void linkSpecificRemoteCloseHandler() {
+        // Nothing needed for receiver link remote close
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverBuilder.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverBuilder.java
index a191fcd0..5f6e66fa 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverBuilder.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientReceiverBuilder.java
@@ -19,6 +19,7 @@ package org.apache.qpid.protonj2.client.impl;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.qpid.protonj2.client.LinkOptions;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.SessionOptions;
 import org.apache.qpid.protonj2.client.SourceOptions;
@@ -105,7 +106,7 @@ final class ClientReceiverBuilder {
         return new ClientStreamReceiver(session, options, receiverId, protonReceiver);
     }
 
-    public static Receiver recreateReceiver(ClientSession session, Receiver previousReceiver, ReceiverOptions options) {
+    public static Receiver recreateReceiver(ClientSession session, Receiver previousReceiver, LinkOptions<?> options) {
         final Receiver protonReceiver = session.getProtonSession().receiver(previousReceiver.getName());
 
         protonReceiver.setSource(previousReceiver.getSource());
@@ -129,7 +130,7 @@ final class ClientReceiverBuilder {
         return session.id() + ":" + receiverCounter.incrementAndGet();
     }
 
-    private Receiver createReceiver(String address, ReceiverOptions options, String receiverId) {
+    private Receiver createReceiver(String address, LinkOptions<?> options, String receiverId) {
         final String linkName;
 
         if (options.linkName() != null) {
@@ -159,7 +160,7 @@ final class ClientReceiverBuilder {
         return protonReceiver;
     }
 
-    private Source createSource(String address, ReceiverOptions options) {
+    private Source createSource(String address, LinkOptions<?> options) {
         final SourceOptions sourceOptions = options.sourceOptions();
 
         Source source = new Source();
@@ -195,7 +196,7 @@ final class ClientReceiverBuilder {
         return source;
     }
 
-    private Source createDurableSource(String address, ReceiverOptions options) {
+    private Source createDurableSource(String address, LinkOptions<?> options) {
         final SourceOptions sourceOptions = options.sourceOptions();
         final Source source = new Source();
 
@@ -217,7 +218,7 @@ final class ClientReceiverBuilder {
         return source;
     }
 
-    private Target createTarget(String address, ReceiverOptions options) {
+    private Target createTarget(String address, LinkOptions<?> options) {
         final TargetOptions targetOptions = options.targetOptions();
         final Target target = new Target();
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 8686c4c2..f449d400 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -19,33 +19,20 @@ package org.apache.qpid.protonj2.client.impl;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.function.Consumer;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.client.AdvancedMessage;
-import org.apache.qpid.protonj2.client.ErrorCondition;
 import org.apache.qpid.protonj2.client.Message;
 import org.apache.qpid.protonj2.client.Sender;
 import org.apache.qpid.protonj2.client.SenderOptions;
-import org.apache.qpid.protonj2.client.Source;
-import org.apache.qpid.protonj2.client.Target;
 import org.apache.qpid.protonj2.client.Tracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
-import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
-import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
 import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
-import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
-import org.apache.qpid.protonj2.client.futures.ClientSynchronization;
-import org.apache.qpid.protonj2.engine.Connection;
-import org.apache.qpid.protonj2.engine.Engine;
-import org.apache.qpid.protonj2.engine.LinkState;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
 import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
@@ -55,197 +42,24 @@ import org.slf4j.LoggerFactory;
 /**
  * Proton based AMQP Sender
  */
-class ClientSender implements Sender {
+final class ClientSender extends ClientSenderLinkType<Sender> implements Sender {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientSender.class);
 
-    protected static final AtomicIntegerFieldUpdater<ClientSender> CLOSED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ClientSender.class, "closed");
+    private final boolean sendsSettled;
+    private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
+    private final SenderOptions options;
 
-    protected final ClientFuture<Sender> openFuture;
-    protected final ClientFuture<Sender> closeFuture;
-
-    protected volatile int closed;
-    protected ClientException failureCause;
-
-    protected final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
-    protected final SenderOptions options;
-    protected final ClientSession session;
-    protected final ScheduledExecutorService executor;
-    protected final String senderId;
-    protected final boolean sendsSettled;
-    protected org.apache.qpid.protonj2.engine.Sender protonSender;
-    protected Consumer<Sender> senderRemotelyClosedHandler;
-
-    protected volatile Source remoteSource;
-    protected volatile Target remoteTarget;
+    private org.apache.qpid.protonj2.engine.Sender protonSender;
 
     ClientSender(ClientSession session, SenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
+        super(session, senderId, options);
+
         this.options = new SenderOptions(options);
-        this.session = session;
-        this.senderId = senderId;
-        this.executor = session.getScheduler();
-        this.openFuture = session.getFutureFactory().createFuture();
-        this.closeFuture = session.getFutureFactory().createFuture();
         this.protonSender = protonSender.setLinkedResource(this);
         this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
-    @Override
-    public String address() throws ClientException {
-        final org.apache.qpid.protonj2.types.messaging.Target target;
-        if (isDynamic()) {
-            waitForOpenToComplete();
-            target = protonSender.getRemoteTarget();
-        } else {
-            target = protonSender.getTarget();
-        }
-
-        if (target != null) {
-            return target.getAddress();
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public Source source() throws ClientException {
-        waitForOpenToComplete();
-        return remoteSource;
-    }
-
-    @Override
-    public Target target() throws ClientException {
-        waitForOpenToComplete();
-        return remoteTarget;
-    }
-
-    @Override
-    public ClientInstance client() {
-        return session.client();
-    }
-
-    @Override
-    public ClientConnection connection() {
-        return session.connection();
-    }
-
-    @Override
-    public ClientSession session() {
-        return session;
-    }
-
-    @Override
-    public ClientFuture<Sender> openFuture() {
-        return openFuture;
-    }
-
-    @Override
-    public void close() {
-        try {
-            doCloseOrDetach(true, null).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void close(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        try {
-            doCloseOrDetach(true, error).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void detach() {
-        try {
-            doCloseOrDetach(false, null).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void detach(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        try {
-            doCloseOrDetach(false, error).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public ClientFuture<Sender> closeAsync() {
-        return doCloseOrDetach(true, null);
-    }
-
-    @Override
-    public ClientFuture<Sender> closeAsync(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        return doCloseOrDetach(true, error);
-    }
-
-    @Override
-    public ClientFuture<Sender> detachAsync() {
-        return doCloseOrDetach(false, null);
-    }
-
-    @Override
-    public ClientFuture<Sender> detachAsync(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        return doCloseOrDetach(false, error);
-    }
-
-    private ClientFuture<Sender> doCloseOrDetach(boolean close, ErrorCondition error) {
-        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
-            // Already closed by failure or shutdown so no need to queue task
-            if (!closeFuture.isDone()) {
-                executor.execute(() -> {
-                    if (protonSender.isLocallyOpen()) {
-                        try {
-                            protonSender.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
-                            if (close) {
-                                protonSender.close();
-                            } else {
-                                protonSender.detach();
-                            }
-                        } catch (Throwable ignore) {
-                            closeFuture.complete(this);
-                        }
-                    }
-                });
-            }
-        }
-        return closeFuture;
-    }
-
-    @Override
-    public Map<String, Object> properties() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringKeyedMap(protonSender.getRemoteProperties());
-    }
-
-    @Override
-    public String[] offeredCapabilities() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringArray(protonSender.getRemoteOfferedCapabilities());
-    }
-
-    @Override
-    public String[] desiredCapabilities() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringArray(protonSender.getRemoteDesiredCapabilities());
-    }
-
     @Override
     public Tracker send(Message<?> message) throws ClientException {
         checkClosedOrFailed();
@@ -276,11 +90,6 @@ class ClientSender implements Sender {
         return this.options;
     }
 
-    Sender remotelyClosedHandler(Consumer<Sender> handler) {
-        this.senderRemotelyClosedHandler = handler;
-        return this;
-    }
-
     void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException {
         checkClosedOrFailed();
         executor.execute(() -> {
@@ -288,210 +97,16 @@ class ClientSender implements Sender {
         });
     }
 
-    void abort(OutgoingDelivery delivery, ClientTracker tracker) throws ClientException {
-        checkClosedOrFailed();
-        ClientFuture<Tracker> request = session().getFutureFactory().createFuture(new ClientSynchronization<Tracker>() {
-
-            @Override
-            public void onPendingSuccess(Tracker result) {
-                handleCreditStateUpdated(getProtonSender());
-            }
-
-            @Override
-            public void onPendingFailure(Throwable cause) {
-                handleCreditStateUpdated(getProtonSender());
-            }
-        });
-
-        executor.execute(() -> {
-            if (delivery.getTransferCount() == 0) {
-                delivery.abort();
-                request.complete(tracker);
-            } else {
-                ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, delivery, delivery.getMessageFormat(), null, false, request).abort();
-                try {
-                    if (protonSender.isSendable() && (protonSender.current() == null || protonSender.current() == delivery)) {
-                        envelope.sendPayload(delivery.getState(), delivery.isSettled());
-                    } else {
-                        if (protonSender.current() == delivery) {
-                            addToHeadOfBlockedQueue(envelope);
-                        } else {
-                            addToTailOfBlockedQueue(envelope);
-                        }
-                    }
-                } catch (Exception error) {
-                    request.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
-                }
-            }
-        });
-
-        session.request(this, request);
-    }
-
-    void complete(OutgoingDelivery delivery, ClientTracker tracker) throws ClientException {
-        checkClosedOrFailed();
-        ClientFuture<Tracker> request = session().getFutureFactory().createFuture(new ClientSynchronization<Tracker>() {
-
-            @Override
-            public void onPendingSuccess(Tracker result) {
-                handleCreditStateUpdated(getProtonSender());
-            }
-
-            @Override
-            public void onPendingFailure(Throwable cause) {
-                handleCreditStateUpdated(getProtonSender());
-            }
-        });
-
-        executor.execute(() -> {
-            ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, delivery, delivery.getMessageFormat(), null, true, request);
-            try {
-                if (protonSender.isSendable() && (protonSender.current() == null || protonSender.current() == delivery)) {
-                    envelope.sendPayload(delivery.getState(), delivery.isSettled());
-                } else {
-                    if (protonSender.current() == delivery) {
-                        addToHeadOfBlockedQueue(envelope);
-                    } else {
-                        addToTailOfBlockedQueue(envelope);
-                    }
-                }
-            } catch (Exception error) {
-                request.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
-            }
-        });
-
-        session.request(this, request);
-    }
-
-    ClientSender open() {
-        protonSender.localOpenHandler(this::handleLocalOpen)
-                    .localCloseHandler(this::handleLocalCloseOrDetach)
-                    .localDetachHandler(this::handleLocalCloseOrDetach)
-                    .openHandler(this::handleRemoteOpen)
-                    .closeHandler(this::handleRemoteCloseOrDetach)
-                    .detachHandler(this::handleRemoteCloseOrDetach)
-                    .parentEndpointClosedHandler(this::handleParentEndpointClosed)
-                    .creditStateUpdateHandler(this::handleCreditStateUpdated)
-                    .engineShutdownHandler(this::handleEngineShutdown)
-                    .open();
-
-        return this;
-    }
-
-    void setFailureCause(ClientException failureCause) {
-        this.failureCause = failureCause;
-    }
-
     org.apache.qpid.protonj2.engine.Sender getProtonSender() {
         return protonSender;
     }
 
-    ClientException getFailureCause() {
-        if (failureCause == null) {
-            return session.getFailureCause();
-        } else {
-            return failureCause;
-        }
-    }
-
-    String getId() {
-        return senderId;
-    }
-
-    boolean isClosed() {
-        return closed > 0;
-    }
-
-    boolean isAnonymous() {
-        return protonSender.<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null;
-    }
-
-    boolean isDynamic() {
-        return protonSender.getTarget() != null && protonSender.<org.apache.qpid.protonj2.types.messaging.Target>getTarget().isDynamic();
-    }
-
     boolean isSendingSettled() {
         return sendsSettled;
     }
 
     //----- Handlers for proton receiver events
 
-    private void handleLocalOpen(org.apache.qpid.protonj2.engine.Sender sender) {
-        if (options.openTimeout() > 0) {
-            executor.schedule(() -> {
-                if (!openFuture.isDone()) {
-                    immediateLinkShutdown(new ClientOperationTimedOutException("Sender open timed out waiting for remote to respond"));
-                }
-            }, options.openTimeout(), TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private void handleLocalCloseOrDetach(org.apache.qpid.protonj2.engine.Sender sender) {
-        // If not yet remotely closed we only wait for a remote close if the engine isn't
-        // already failed and we have successfully opened the sender without a timeout.
-        if (!sender.getEngine().isShutdown() && failureCause == null && sender.isRemotelyOpen()) {
-            final long timeout = options.closeTimeout();
-
-            if (timeout > 0) {
-                session.scheduleRequestTimeout(closeFuture, timeout, () ->
-                    new ClientOperationTimedOutException("Sender close timed out waiting for remote to respond"));
-            }
-        } else {
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
-    private void handleParentEndpointClosed(org.apache.qpid.protonj2.engine.Sender sender) {
-        // Don't react if engine was shutdown and parent closed as a result instead wait to get the
-        // shutdown notification and respond to that change.
-        if (sender.getEngine().isRunning()) {
-            final ClientException failureCause;
-
-            if (sender.getConnection().getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(sender.getConnection().getRemoteCondition());
-            } else if (sender.getSession().getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToSessionClosedException(sender.getSession().getRemoteCondition());
-            } else if (sender.getEngine().failureCause() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(sender.getEngine().failureCause());
-            } else if (!isClosed()) {
-                failureCause = new ClientResourceRemotelyClosedException("Remote closed without a specific error condition");
-            } else {
-                failureCause = null;
-            }
-
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
-    private void handleRemoteOpen(org.apache.qpid.protonj2.engine.Sender sender) {
-        // Check for deferred close pending and hold completion if so
-        if (sender.getRemoteTarget() != null) {
-            remoteSource = new ClientRemoteSource(sender.getRemoteSource());
-
-            if (sender.getRemoteTarget() != null) {
-                remoteTarget = new ClientRemoteTarget(sender.getRemoteTarget());
-            }
-
-            openFuture.complete(this);
-            LOG.trace("Sender opened successfully");
-        } else {
-            LOG.debug("Sender opened but remote signalled close is pending: ", sender);
-        }
-    }
-
-    private void handleRemoteCloseOrDetach(org.apache.qpid.protonj2.engine.Sender sender) {
-        if (sender.isLocallyOpen()) {
-            try {
-                senderRemotelyClosedHandler.accept(this);
-            } catch (Throwable ignore) {}
-
-            immediateLinkShutdown(ClientExceptionSupport.convertToLinkClosedException(
-                sender.getRemoteCondition(), "Sender remotely closed without explanation from the remote"));
-        } else {
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
     private void handleCreditStateUpdated(org.apache.qpid.protonj2.engine.Sender sender) {
         if (!blocked.isEmpty()) {
             while (sender.isSendable() && !blocked.isEmpty()) {
@@ -519,62 +134,19 @@ class ClientSender implements Sender {
         }
     }
 
-    private void handleEngineShutdown(Engine engine) {
-        if (!isDynamic() && !session.getConnection().getEngine().isShutdown()) {
-            protonSender.localCloseHandler(null);
-            protonSender.localDetachHandler(null);
-            protonSender.close();
-            if (protonSender.hasUnsettled()) {
-                failPendingUnsettledAndBlockedSends(
-                    new ClientConnectionRemotelyClosedException("Connection failed and send result is unknown"));
-            }
-            protonSender = ClientSenderBuilder.recreateSender(session, protonSender, options);
-            protonSender.setLinkedResource(this);
-
-            open();
-        } else {
-            final Connection connection = engine.connection();
-
-            final ClientException failureCause;
-
-            if (connection.getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(connection.getRemoteCondition());
-            } else if (engine.failureCause() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(engine.failureCause());
-            } else if (!isClosed()) {
-                failureCause = new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition");
-            } else {
-                failureCause = null;
-            }
-
-            immediateLinkShutdown(failureCause);
-        }
-    }
+    //----- Internal class implementation details
 
-    void handleAnonymousRelayNotSupported() {
-        if (isAnonymous() && protonSender.getState() == LinkState.IDLE) {
-            immediateLinkShutdown(new ClientUnsupportedOperationException("Anonymous relay support not available from this connection"));
-        }
+    @Override
+    protected Sender self() {
+        return this;
     }
 
-    //----- Private implementation details
-
-    private void waitForOpenToComplete() throws ClientException {
-        if (!openFuture.isComplete() || openFuture.isFailed()) {
-            try {
-                openFuture.get();
-            } catch (ExecutionException | InterruptedException e) {
-                Thread.interrupted();
-                if (failureCause != null) {
-                    throw failureCause;
-                } else {
-                    throw ClientExceptionSupport.createNonFatalOrPassthrough(e.getCause());
-                }
-            }
-        }
+    @Override
+    protected org.apache.qpid.protonj2.engine.Sender protonLink() {
+        return protonSender;
     }
 
-    protected final void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
+    private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
                 send.failed(send.createSendTimedOutException());
@@ -584,7 +156,7 @@ class ClientSender implements Sender {
         blocked.addLast(send);
     }
 
-    protected final void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
+    private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
         if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
             send.sendTimeout(executor.schedule(() -> {
                 send.failed(send.createSendTimedOutException());
@@ -594,7 +166,7 @@ class ClientSender implements Sender {
         blocked.addFirst(send);
     }
 
-    protected Tracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
+    private Tracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
         final ClientFuture<Tracker> operation = session.getFutureFactory().createFuture();
         final ProtonBuffer buffer = message.encode(deliveryAnnotations);
 
@@ -619,84 +191,39 @@ class ClientSender implements Sender {
         return session.request(this, operation);
     }
 
-    protected Tracker createTracker(OutgoingDelivery delivery) {
+    private Tracker createTracker(OutgoingDelivery delivery) {
         return new ClientTracker(this, delivery);
     }
 
-    protected Tracker createNoOpTracker() {
+    private Tracker createNoOpTracker() {
         return new ClientNoOpTracker(this);
     }
 
-    protected boolean notClosedOrFailed(ClientFuture<?> request) {
-        return notClosedOrFailed(request, protonSender);
-    }
-
-    protected boolean notClosedOrFailed(ClientFuture<?> request, org.apache.qpid.protonj2.engine.Sender sender) {
-        if (isClosed()) {
-            request.failed(new ClientIllegalStateException("The Sender was explicitly closed", failureCause));
-            return false;
-        } else if (failureCause != null) {
-            request.failed(failureCause);
-            return false;
-        } else if (sender.isLocallyClosedOrDetached()) {
-            if (sender.getConnection().getRemoteCondition() != null) {
-                request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getConnection().getRemoteCondition()));
-            } else if (sender.getSession().getRemoteCondition() != null) {
-                request.failed(ClientExceptionSupport.convertToSessionClosedException(sender.getSession().getRemoteCondition()));
-            } else if (sender.getEngine().failureCause() != null) {
-                request.failed(ClientExceptionSupport.convertToConnectionClosedException(sender.getEngine().failureCause()));
-            } else {
-                request.failed(new ClientIllegalStateException("Sender closed without a specific error condition"));
-            }
-            return false;
-        } else {
-            return true;
-        }
+    @Override
+    protected void linkSpecificLocalOpenHandler() {
+        protonSender.creditStateUpdateHandler(this::handleCreditStateUpdated);
     }
 
-    protected void checkClosedOrFailed() throws ClientException {
-        if (isClosed()) {
-            throw new ClientIllegalStateException("The Sender was explicitly closed", failureCause);
-        } else if (failureCause != null) {
-            throw failureCause;
+    @Override
+    protected void recreateLinkForReconnect() {
+        protonSender.localCloseHandler(null);
+        protonSender.localDetachHandler(null);
+        protonSender.close();
+        if (protonSender.hasUnsettled()) {
+            failPendingUnsettledAndBlockedSends(
+                new ClientConnectionRemotelyClosedException("Connection failed and send result is unknown"));
         }
+        protonSender = ClientSenderBuilder.recreateSender(session, protonSender, options);
+        protonSender.setLinkedResource(this);
     }
 
-    private void immediateLinkShutdown(ClientException failureCause) {
-        if (this.failureCause == null) {
-            this.failureCause = failureCause;
-        }
-
-        try {
-            if (protonSender.isRemotelyDetached()) {
-                protonSender.detach();
-            } else {
-                protonSender.close();
-            }
-        } catch (Throwable ignore) {
-            // Ignore
-        } finally {
-            // If the parent of this sender is a stream session than this sender owns it
-            // and must close it when it closes itself to ensure that the resources are
-            // cleaned up on the remote for the session.
-            if (session instanceof ClientStreamSession) {
-                session.closeAsync();
-            }
-        }
-
+    @Override
+    protected void linkSpecificCleanupHandler(ClientException failureCause) {
         if (failureCause != null) {
             failPendingUnsettledAndBlockedSends(failureCause);
         } else {
             failPendingUnsettledAndBlockedSends(new ClientResourceRemotelyClosedException("The sender link has closed"));
         }
-
-        if (failureCause != null) {
-            openFuture.failed(failureCause);
-        } else {
-            openFuture.complete(this);
-        }
-
-        closeFuture.complete(this);
     }
 
     private void failPendingUnsettledAndBlockedSends(ClientException cause) {
@@ -715,5 +242,141 @@ class ClientSender implements Sender {
             return true;
         });
     }
-}
 
+    @Override
+    protected void linkSpecificLocalCloseHandler() {
+        // Nothing needed for sender handling
+    }
+
+    @Override
+    protected void linkSpecificRemoteOpenHandler() {
+        // Nothing needed for sender handling
+    }
+
+    @Override
+    protected void linkSpecificRemoteCloseHandler() {
+        // Nothing needed for sender handling
+    }
+
+    //----- Internal envelope for deliveries to track potential partial sends etc.
+
+    private final static class ClientOutgoingEnvelope implements ClientTransactionContext.Sendable {
+
+        private final ProtonBuffer payload;
+        private final ClientFuture<Tracker> request;
+        private final ClientSender sender;
+        private final int messageFormat;
+
+        private ScheduledFuture<?> sendTimeout;
+        private OutgoingDelivery delivery;
+
+        /**
+         * Create a new In-flight Send instance for a complete message send.  No further
+         * sends can occur after the send completes.
+         *
+         * @param sender
+         *      The {@link ClientSender} instance that is attempting to send this encoded message.
+         * @param messageFormat
+         *      The message format code to assign the send if this is the first delivery.
+         * @param payload
+         *      The payload that comprises this portion of the send.
+         * @param request
+         *      The requesting operation that initiated this send.
+         */
+        ClientOutgoingEnvelope(ClientSender sender, int messageFormat, ProtonBuffer payload, ClientFuture<Tracker> request) {
+            this.messageFormat = messageFormat;
+            this.payload = payload;
+            this.request = request;
+            this.sender = sender;
+        }
+
+        /**
+         * @return the {@link ScheduledFuture} used to determine when the send should fail if no credit available to write.
+         */
+        public ScheduledFuture<?> sendTimeout() {
+            return sendTimeout;
+        }
+
+        /**
+         * Sets the {@link ScheduledFuture} which should be used when a send cannot be immediately performed.
+         *
+         * @param sendTimeout
+         * 		The {@link ScheduledFuture} that will fail the send if not cancelled once it has been performed.
+         */
+        public void sendTimeout(ScheduledFuture<?> sendTimeout) {
+            this.sendTimeout = sendTimeout;
+        }
+
+        public OutgoingDelivery delivery() {
+            return delivery;
+        }
+
+        public ClientOutgoingEnvelope succeeded() {
+            if (sendTimeout != null) {
+                sendTimeout.cancel(true);
+            }
+
+            request.complete(delivery.getLinkedResource());
+
+            return this;
+        }
+
+        public ClientOutgoingEnvelope failed(ClientException exception) {
+            if (sendTimeout != null) {
+                sendTimeout.cancel(true);
+            }
+
+            request.failed(exception);
+
+            return this;
+        }
+
+        @Override
+        public void discard() {
+            if (sendTimeout != null) {
+                sendTimeout.cancel(true);
+                sendTimeout = null;
+            }
+
+            if (delivery != null) {
+                ClientTracker tracker = delivery.getLinkedResource();
+                if (tracker != null) {
+                    tracker.settlementFuture().complete(tracker);
+                }
+                request.complete(delivery.getLinkedResource());
+            } else {
+                request.complete(sender.createNoOpTracker());
+            }
+        }
+
+        @Override
+        public void send(DeliveryState state, boolean settled) {
+            if (delivery == null) {
+                delivery = sender.getProtonSender().next();
+                delivery.setLinkedResource(sender.createTracker(delivery));
+            }
+
+            if (delivery.getTransferCount() == 0) {
+                delivery.setMessageFormat(messageFormat);
+                delivery.disposition(state, settled);
+            }
+
+            sender.connection().autoFlushOff();
+            try {
+                delivery.streamBytes(payload, true);
+                if (payload != null && payload.isReadable()) {
+                    sender.addToHeadOfBlockedQueue(this);
+                } else {
+                    succeeded();
+                }
+                sender.connection().flush();
+            } finally {
+                sender.connection().autoFlushOn();
+            }
+        }
+
+        public ClientException createSendTimedOutException() {
+            return new ClientSendTimedOutException("Timed out waiting for credit to send");
+        }
+    }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
index 8fe6e945..62f4a0dc 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderBuilder.java
@@ -18,6 +18,7 @@ package org.apache.qpid.protonj2.client.impl;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.qpid.protonj2.client.LinkOptions;
 import org.apache.qpid.protonj2.client.SenderOptions;
 import org.apache.qpid.protonj2.client.SessionOptions;
 import org.apache.qpid.protonj2.client.SourceOptions;
@@ -78,7 +79,7 @@ final class ClientSenderBuilder {
         return new ClientStreamSender(session, options, senderId, protonSender);
     }
 
-    private static Sender createSender(Session protonSession, String address, SenderOptions options, String senderId) {
+    private static Sender createSender(Session protonSession, String address, LinkOptions<?> options, String senderId) {
         final String linkName;
 
         if (options.linkName() != null) {
@@ -116,7 +117,7 @@ final class ClientSenderBuilder {
         return protonSender;
     }
 
-    private static Source createSource(String address, SenderOptions options) {
+    private static Source createSource(String address, LinkOptions<?> options) {
         final SourceOptions sourceOptions = options.sourceOptions();
         final Source source = new Source();
 
@@ -151,7 +152,7 @@ final class ClientSenderBuilder {
         return source;
     }
 
-    private static Target createTarget(String address, SenderOptions options) {
+    private static Target createTarget(String address, LinkOptions<?> options) {
         final TargetOptions targetOptions = options.targetOptions();
         final Target target = new Target();
 
@@ -171,7 +172,7 @@ final class ClientSenderBuilder {
         return target;
     }
 
-    public static Sender recreateSender(ClientSession session, Sender previousSender, SenderOptions options) {
+    public static Sender recreateSender(ClientSession session, Sender previousSender, LinkOptions<?> options) {
         final Sender protonSender = session.getProtonSession().sender(previousSender.getName());
 
         protonSender.setSource(previousSender.getSource());
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
new file mode 100644
index 00000000..afc1ef51
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSenderLinkType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import org.apache.qpid.protonj2.client.Link;
+import org.apache.qpid.protonj2.client.LinkOptions;
+import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
+import org.apache.qpid.protonj2.engine.LinkState;
+import org.apache.qpid.protonj2.engine.Sender;
+
+/**
+ * Base type for all the proton client sender types which provides a few extra
+ * APIs for the connection and session to use when managing senders
+ */
+public abstract class ClientSenderLinkType<LinkType extends Link<LinkType>> extends ClientLinkType<LinkType, Sender> {
+
+    protected ClientSenderLinkType(ClientSession session, String linkId, LinkOptions<?> options) {
+        super(session, linkId, options);
+    }
+
+    final boolean isAnonymous() {
+        return protonLink().<org.apache.qpid.protonj2.types.messaging.Target>getTarget().getAddress() == null;
+    }
+
+    final void handleAnonymousRelayNotSupported() {
+        if (isAnonymous() && protonLink().getState() == LinkState.IDLE) {
+            immediateLinkShutdown(new ClientUnsupportedOperationException("Anonymous relay support not available from this connection"));
+        }
+    }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
index 1681d963..9455fcb7 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSession.java
@@ -354,23 +354,23 @@ public class ClientSession implements Session {
     //----- Internal resource open APIs expected to be called from the connection event loop
 
     ClientReceiver internalOpenReceiver(String address, ReceiverOptions receiverOptions) throws ClientException {
-        return receiverBuilder.receiver(address, receiverOptions).open();
+        return (ClientReceiver) receiverBuilder.receiver(address, receiverOptions).open();
     }
 
     ClientStreamReceiver internalOpenStreamReceiver(String address, StreamReceiverOptions receiverOptions) throws ClientException {
-        return receiverBuilder.streamReceiver(address, receiverOptions).open();
+        return (ClientStreamReceiver) receiverBuilder.streamReceiver(address, receiverOptions).open();
     }
 
     ClientReceiver internalOpenDurableReceiver(String address, String subscriptionName, ReceiverOptions receiverOptions) throws ClientException {
-        return receiverBuilder.durableReceiver(address, subscriptionName, receiverOptions).open();
+        return (ClientReceiver) receiverBuilder.durableReceiver(address, subscriptionName, receiverOptions).open();
     }
 
     ClientReceiver internalOpenDynamicReceiver(Map<String, Object> dynamicNodeProperties, ReceiverOptions receiverOptions) throws ClientException {
-        return receiverBuilder.dynamicReceiver(dynamicNodeProperties, receiverOptions).open();
+        return (ClientReceiver) receiverBuilder.dynamicReceiver(dynamicNodeProperties, receiverOptions).open();
     }
 
     ClientSender internalOpenSender(String address, SenderOptions senderOptions) throws ClientException {
-        return senderBuilder.sender(address, senderOptions).open();
+        return (ClientSender) senderBuilder.sender(address, senderOptions).open();
     }
 
     ClientSender internalOpenAnonymousSender(SenderOptions senderOptions) throws ClientException {
@@ -378,14 +378,14 @@ public class ClientSession implements Session {
         // and open the sender if so, otherwise we need to wait.
         if (connection.openFuture().isDone()) {
             connection.checkAnonymousRelaySupported();
-            return senderBuilder.anonymousSender(senderOptions).open();
+            return (ClientSender) senderBuilder.anonymousSender(senderOptions).open();
         } else {
             return senderBuilder.anonymousSender(senderOptions);
         }
     }
 
     ClientStreamSender internalOpenStreamSender(String address, StreamSenderOptions senderOptions) throws ClientException {
-        return senderBuilder.streamSender(address, senderOptions).open();
+        return (ClientStreamSender) senderBuilder.streamSender(address, senderOptions).open();
     }
 
     //----- Internal API accessible for use within the package
@@ -521,7 +521,7 @@ public class ClientSession implements Session {
 
         session.senders().forEach(sender -> {
             if (!sender.isLocallyOpen()) {
-                ClientSender clientSender = sender.getLinkedResource();
+                ClientSenderLinkType<?> clientSender = sender.getLinkedResource();
                 if (connection.getCapabilities().anonymousRelaySupported()) {
                     clientSender.open();
                 } else {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
index c512a8c4..8b249f9c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiver.java
@@ -20,30 +20,20 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.qpid.protonj2.client.ErrorCondition;
-import org.apache.qpid.protonj2.client.Receiver;
-import org.apache.qpid.protonj2.client.Source;
 import org.apache.qpid.protonj2.client.StreamDelivery;
 import org.apache.qpid.protonj2.client.StreamReceiver;
 import org.apache.qpid.protonj2.client.StreamReceiverOptions;
-import org.apache.qpid.protonj2.client.Target;
-import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
 import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
-import org.apache.qpid.protonj2.engine.Connection;
-import org.apache.qpid.protonj2.engine.Engine;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
+import org.apache.qpid.protonj2.engine.Receiver;
 import org.apache.qpid.protonj2.types.messaging.Released;
 import org.apache.qpid.protonj2.types.transport.DeliveryState;
 import org.slf4j.Logger;
@@ -52,36 +42,21 @@ import org.slf4j.LoggerFactory;
 /**
  * Client implementation of a {@link StreamReceiver}.
  */
-public final class ClientStreamReceiver implements StreamReceiver {
+public final class ClientStreamReceiver extends ClientLinkType<StreamReceiver, org.apache.qpid.protonj2.engine.Receiver> implements StreamReceiver {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientReceiver.class);
 
-    private static final AtomicIntegerFieldUpdater<ClientStreamReceiver> CLOSED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ClientStreamReceiver.class, "closed");
-
-    private final ClientFuture<Receiver> openFuture;
-    private final ClientFuture<Receiver> closeFuture;
-    private ClientFuture<Receiver> drainingFuture;
+    private ClientFuture<StreamReceiver> drainingFuture;
     private ScheduledFuture<?> drainingTimeout;
     private final StreamReceiverOptions options;
-    private final ClientSession session;
-    private final ScheduledExecutorService executor;
-    private final String receiverId;
     private final Map<ClientFuture<StreamDelivery>, ScheduledFuture<?>> receiveRequests = new LinkedHashMap<>();
 
     private org.apache.qpid.protonj2.engine.Receiver protonReceiver;
-    private volatile int closed;
-    private ClientException failureCause;
-    private volatile Source remoteSource;
-    private volatile Target remoteTarget;
 
     ClientStreamReceiver(ClientSession session, StreamReceiverOptions options, String receiverId, org.apache.qpid.protonj2.engine.Receiver receiver) {
+        super(session, receiverId, options);
+
         this.options = options;
-        this.session = session;
-        this.receiverId = receiverId;
-        this.executor = session.getScheduler();
-        this.openFuture = session.getFutureFactory().createFuture();
-        this.closeFuture = session.getFutureFactory().createFuture();
         this.protonReceiver = receiver.setLinkedResource(this);
 
         if (options.creditWindow() > 0) {
@@ -89,115 +64,6 @@ public final class ClientStreamReceiver implements StreamReceiver {
         }
     }
 
-    @Override
-    public ClientInstance client() {
-        return session.client();
-    }
-
-    @Override
-    public ClientConnection connection() {
-        return session.connection();
-    }
-
-    @Override
-    public ClientSession session() {
-        return session;
-    }
-
-    @Override
-    public ClientFuture<Receiver> openFuture() {
-        return openFuture;
-    }
-
-    @Override
-    public void close() {
-        try {
-            doCloseOrDetach(true, null).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void close(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        try {
-            doCloseOrDetach(true, error).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void detach() {
-        try {
-            doCloseOrDetach(false, null).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public void detach(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        try {
-            doCloseOrDetach(false, error).get();
-        } catch (InterruptedException | ExecutionException e) {
-            Thread.interrupted();
-        }
-    }
-
-    @Override
-    public ClientFuture<Receiver> closeAsync() {
-        return doCloseOrDetach(true, null);
-    }
-
-    @Override
-    public ClientFuture<Receiver> closeAsync(ErrorCondition error) {
-        Objects.requireNonNull(error, "Error Condition cannot be null");
-
-        return doCloseOrDetach(true, error);
-    }
-
-    @Override
-    public ClientFuture<Receiver> detachAsync() {
-        return doCloseOrDetach(false, null);
-    }
-
-    @Override
-    public ClientFuture<Receiver> detachAsync(ErrorCondition error) {
-        Objects.requireNonNull(error, "The provided Error Condition cannot be null");
-
-        return doCloseOrDetach(false, error);
-    }
-
-    private ClientFuture<Receiver> doCloseOrDetach(boolean close, ErrorCondition error) {
-        if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) {
-            // Already closed by failure or shutdown so no need to queue task
-            if (!closeFuture.isDone()) {
-                executor.execute(() -> {
-                    if (protonReceiver.isLocallyOpen()) {
-                        try {
-                            protonReceiver.setCondition(ClientErrorCondition.asProtonErrorCondition(error));
-
-                            if (close) {
-                                protonReceiver.close();
-                            } else {
-                                protonReceiver.detach();
-                            }
-                        } catch (Throwable ignore) {
-                            closeFuture.complete(this);
-                        }
-                    }
-                });
-            }
-        }
-
-        return closeFuture;
-    }
-
     @Override
     public StreamDelivery receive() throws ClientException {
         return receive(-1, TimeUnit.MILLISECONDS);
@@ -280,9 +146,9 @@ public final class ClientStreamReceiver implements StreamReceiver {
     }
 
     @Override
-    public Future<Receiver> drain() throws ClientException {
+    public Future<StreamReceiver> drain() throws ClientException {
         checkClosedOrFailed();
-        final ClientFuture<Receiver> drainComplete = session.getFutureFactory().createFuture();
+        final ClientFuture<StreamReceiver> drainComplete = session.getFutureFactory().createFuture();
 
         executor.execute(() -> {
             if (notClosedOrFailed(drainComplete)) {
@@ -308,46 +174,6 @@ public final class ClientStreamReceiver implements StreamReceiver {
         return drainComplete;
     }
 
-    @Override
-    public Map<String, Object> properties() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringKeyedMap(protonReceiver.getRemoteProperties());
-    }
-
-    @Override
-    public String[] offeredCapabilities() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringArray(protonReceiver.getRemoteOfferedCapabilities());
-    }
-
-    @Override
-    public String[] desiredCapabilities() throws ClientException {
-        waitForOpenToComplete();
-        return ClientConversionSupport.toStringArray(protonReceiver.getRemoteDesiredCapabilities());
-    }
-
-    @Override
-    public String address() throws ClientException {
-        if (isDynamic()) {
-            waitForOpenToComplete();
-            return protonReceiver.getRemoteSource().getAddress();
-        } else {
-            return protonReceiver.getSource() != null ? protonReceiver.getSource().getAddress() : null;
-        }
-    }
-
-    @Override
-    public Source source() throws ClientException {
-        waitForOpenToComplete();
-        return remoteSource;
-    }
-
-    @Override
-    public Target target() throws ClientException {
-        waitForOpenToComplete();
-        return remoteTarget;
-    }
-
     @Override
     public long queuedDeliveries() throws ClientException {
         checkClosedOrFailed();
@@ -374,167 +200,12 @@ public final class ClientStreamReceiver implements StreamReceiver {
 
     //----- Internal API for the ClientReceiver and other Client objects
 
-    ClientStreamReceiver open() {
-        protonReceiver.localOpenHandler(this::handleLocalOpen)
-                      .localCloseHandler(this::handleLocalCloseOrDetach)
-                      .localDetachHandler(this::handleLocalCloseOrDetach)
-                      .openHandler(this::handleRemoteOpen)
-                      .closeHandler(this::handleRemoteCloseOrDetach)
-                      .detachHandler(this::handleRemoteCloseOrDetach)
-                      .parentEndpointClosedHandler(this::handleParentEndpointClosed)
-                      .deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
-                      .deliveryReadHandler(this::handleDeliveryRead)
-                      .deliveryAbortedHandler(this::handleDeliveryAborted)
-                      .creditStateUpdateHandler(this::handleReceiverCreditUpdated)
-                      .engineShutdownHandler(this::handleEngineShutdown)
-                      .open();
-
-        return this;
-    }
-
-    void setFailureCause(ClientException failureCause) {
-        this.failureCause = failureCause;
-    }
-
-    ClientException getFailureCause() {
-        if (failureCause == null) {
-            return session.getFailureCause();
-        } else {
-            return failureCause;
-        }
-    }
-
-    String getId() {
-        return receiverId;
-    }
-
-    boolean isClosed() {
-        return closed > 0;
-    }
-
-    boolean isDynamic() {
-        return protonReceiver.getSource() != null && protonReceiver.getSource().isDynamic();
-    }
-
     StreamReceiverOptions receiverOptions() {
         return options;
     }
 
     //----- Handlers for proton receiver events
 
-    private void handleLocalOpen(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        if (options.openTimeout() > 0) {
-            executor.schedule(() -> {
-                if (!openFuture.isDone()) {
-                    immediateLinkShutdown(new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
-                }
-            }, options.openTimeout(), TimeUnit.MILLISECONDS);
-        }
-    }
-
-    private void handleLocalCloseOrDetach(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        // If not yet remotely closed we only wait for a remote close if the engine isn't
-        // already failed and we have successfully opened the sender without a timeout.
-        if (!receiver.getEngine().isShutdown() && failureCause == null && receiver.isRemotelyOpen()) {
-            final long timeout = options.closeTimeout();
-
-            if (timeout > 0) {
-                session.scheduleRequestTimeout(closeFuture, timeout, () ->
-                new ClientOperationTimedOutException("receiver close timed out waiting for remote to respond"));
-            }
-        } else {
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
-    private void handleRemoteOpen(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        // Check for deferred close pending and hold completion if so
-        if (receiver.getRemoteSource() != null) {
-            remoteSource = new ClientRemoteSource(receiver.getRemoteSource());
-
-            if (receiver.getRemoteTarget() != null) {
-                remoteTarget = new ClientRemoteTarget(receiver.getRemoteTarget());
-            }
-
-            replenishCreditIfNeeded();
-
-            openFuture.complete(this);
-            LOG.trace("Receiver opened successfully: {}", receiverId);
-        } else {
-            LOG.debug("Receiver opened but remote signalled close is pending: {}", receiverId);
-        }
-    }
-
-    private void handleRemoteCloseOrDetach(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        if (receiver.isLocallyOpen()) {
-            immediateLinkShutdown(ClientExceptionSupport.convertToLinkClosedException(
-                receiver.getRemoteCondition(), "Receiver remotely closed without explanation from the remote"));
-        } else {
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
-    private void handleParentEndpointClosed(org.apache.qpid.protonj2.engine.Receiver receiver) {
-        // Don't react if engine was shutdown and parent closed as a result instead wait to get the
-        // shutdown notification and respond to that change.
-        if (receiver.getEngine().isRunning()) {
-            final ClientException failureCause;
-
-            if (receiver.getConnection().getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(receiver.getConnection().getRemoteCondition());
-            } else if (receiver.getSession().getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToSessionClosedException(receiver.getSession().getRemoteCondition());
-            } else if (receiver.getEngine().failureCause() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(receiver.getEngine().failureCause());
-            } else if (!isClosed()) {
-                failureCause = new ClientResourceRemotelyClosedException("Remote closed without a specific error condition");
-            } else {
-                failureCause = null;
-            }
-
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
-    private void handleEngineShutdown(Engine engine) {
-        if (!isDynamic() && !session.getConnection().getEngine().isShutdown()) {
-            int previousCredit = protonReceiver.getCredit() + protonReceiver.unsettled().size();
-
-            if (drainingFuture != null) {
-                drainingFuture.complete(this);
-                if (drainingTimeout != null) {
-                    drainingTimeout.cancel(false);
-                    drainingTimeout = null;
-                }
-            }
-
-            protonReceiver.localCloseHandler(null);
-            protonReceiver.localDetachHandler(null);
-            protonReceiver.close();
-            protonReceiver = ClientReceiverBuilder.recreateReceiver(session, protonReceiver, options);
-            protonReceiver.setLinkedResource(this);
-            protonReceiver.addCredit(previousCredit);
-
-            open();
-        } else {
-            final Connection connection = engine.connection();
-
-            final ClientException failureCause;
-
-            if (connection.getRemoteCondition() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(connection.getRemoteCondition());
-            } else if (engine.failureCause() != null) {
-                failureCause = ClientExceptionSupport.convertToConnectionClosedException(engine.failureCause());
-            } else if (!isClosed()) {
-                failureCause = new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition");
-            } else {
-                failureCause = null;
-            }
-
-            immediateLinkShutdown(failureCause);
-        }
-    }
-
     private void handleDeliveryRead(IncomingDelivery delivery) {
         LOG.trace("Delivery data was received: {}", delivery);
         if (delivery.getDefaultDeliveryState() == null) {
@@ -630,59 +301,12 @@ public final class ClientStreamReceiver implements StreamReceiver {
         }
     }
 
-    private void waitForOpenToComplete() throws ClientException {
-        if (!openFuture.isComplete() || openFuture.isFailed()) {
-            try {
-                openFuture.get();
-            } catch (ExecutionException | InterruptedException e) {
-                Thread.interrupted();
-                if (failureCause != null) {
-                    throw failureCause;
-                } else {
-                    throw ClientExceptionSupport.createNonFatalOrPassthrough(e.getCause());
-                }
-            }
-        }
-    }
-
-    private boolean notClosedOrFailed(ClientFuture<?> request) {
-        if (isClosed()) {
-            request.failed(new ClientIllegalStateException("The Receiver was explicitly closed", failureCause));
-            return false;
-        } else if (failureCause != null) {
-            request.failed(failureCause);
-            return false;
-        } else {
-            return true;
-        }
-    }
-
-    private void checkClosedOrFailed() throws ClientException {
-        if (isClosed()) {
-            throw new ClientIllegalStateException("The Receiver was explicitly closed", failureCause);
-        } else if (failureCause != null) {
-            throw failureCause;
-        }
-    }
-
-    private void immediateLinkShutdown(ClientException failureCause) {
-        CLOSED_UPDATER.set(this, 1);
-
-        if (this.failureCause == null) {
-            this.failureCause = failureCause;
-        }
-
-        try {
-            if (protonReceiver.isRemotelyDetached()) {
-                protonReceiver.detach();
-            } else {
-                protonReceiver.close();
-            }
-        } catch (Exception ignore) {
-            // Ignore
-        } finally {
-            session.closeAsync();
-        }
+    @Override
+    protected void linkSpecificCleanupHandler(ClientException failureCause) {
+        // If the parent of this sender is a stream session than this sender owns it
+        // and must close it when it closes itself to ensure that the resources are
+        // cleaned up on the remote for the session.
+        session.closeAsync();
 
         receiveRequests.forEach((future, timeout) -> {
             if (timeout != null) {
@@ -704,23 +328,64 @@ public final class ClientStreamReceiver implements StreamReceiver {
             }
         });
 
-        if (failureCause != null) {
-            openFuture.failed(failureCause);
-            if (drainingFuture != null) {
-                drainingFuture.failed(failureCause);
-            }
-        } else {
-            openFuture.complete(this);
-            if (drainingFuture != null) {
-                drainingFuture.failed(new ClientResourceRemotelyClosedException("The Receiver has been closed"));
-            }
-        }
-
         if (drainingTimeout != null) {
+            drainingFuture.failed(
+                failureCause != null ? failureCause : new ClientResourceRemotelyClosedException("The Receiver has been closed"));
             drainingTimeout.cancel(false);
             drainingTimeout = null;
         }
+    }
+
+    @Override
+    protected StreamReceiver self() {
+        return this;
+    }
+
+    @Override
+    protected Receiver protonLink() {
+        return protonReceiver;
+    }
+
+    @Override
+    protected void linkSpecificLocalOpenHandler() {
+        protonReceiver.deliveryStateUpdatedHandler(this::handleDeliveryStateRemotelyUpdated)
+                      .deliveryReadHandler(this::handleDeliveryRead)
+                      .deliveryAbortedHandler(this::handleDeliveryAborted)
+                      .creditStateUpdateHandler(this::handleReceiverCreditUpdated);
+    }
+
+    @Override
+    protected void linkSpecificLocalCloseHandler() {
+        // Nothing needed for local close handling
+    }
+
+    @Override
+    protected void linkSpecificRemoteOpenHandler() {
+        replenishCreditIfNeeded();
+    }
+
+    @Override
+    protected void linkSpecificRemoteCloseHandler() {
+        // Nothing needed for remote close handling
+    }
+
+    @Override
+    protected void recreateLinkForReconnect() {
+        int previousCredit = protonReceiver.getCredit() + protonReceiver.unsettled().size();
+
+        if (drainingFuture != null) {
+            drainingFuture.complete(this);
+            if (drainingTimeout != null) {
+                drainingTimeout.cancel(false);
+                drainingTimeout = null;
+            }
+        }
 
-        closeFuture.complete(this);
+        protonReceiver.localCloseHandler(null);
+        protonReceiver.localDetachHandler(null);
+        protonReceiver.close();
+        protonReceiver = ClientReceiverBuilder.recreateReceiver(session, protonReceiver, options);
+        protonReceiver.setLinkedResource(this);
+        protonReceiver.addCredit(previousCredit);
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 491fe31c..2513098e 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -16,56 +16,77 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Map;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.client.AdvancedMessage;
 import org.apache.qpid.protonj2.client.Message;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamSenderOptions;
 import org.apache.qpid.protonj2.client.StreamTracker;
-import org.apache.qpid.protonj2.client.Tracker;
+import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
+import org.apache.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
+import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
 import org.apache.qpid.protonj2.client.futures.ClientFuture;
+import org.apache.qpid.protonj2.client.futures.ClientSynchronization;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
+import org.apache.qpid.protonj2.engine.Sender;
 import org.apache.qpid.protonj2.engine.util.StringUtils;
 import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
+import org.apache.qpid.protonj2.types.transport.DeliveryState;
+import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Client implementation of a {@link StreamSender}.
  */
-public final class ClientStreamSender extends ClientSender implements StreamSender {
+public final class ClientStreamSender extends ClientLinkType<StreamSender, Sender> implements StreamSender {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamSender.class);
 
     private final StreamSenderOptions options;
+    private final boolean sendsSettled;
+    private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
+
+    private org.apache.qpid.protonj2.engine.Sender protonSender;
 
     ClientStreamSender(ClientSession session, StreamSenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
-        super(session, options, senderId, protonSender);
+        super(session, senderId, options);
 
         this.options = new StreamSenderOptions(options);
+        this.protonSender = protonSender.setLinkedResource(this);
+        this.sendsSettled = protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED;
     }
 
     @Override
     public StreamTracker send(Message<?> message) throws ClientException {
         checkClosedOrFailed();
-        return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, true);
+        return sendMessage(ClientMessageSupport.convertMessage(message), null, true);
     }
 
     @Override
     public StreamTracker send(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
         checkClosedOrFailed();
-        return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, true);
+        return sendMessage(ClientMessageSupport.convertMessage(message), null, true);
     }
 
     @Override
     public StreamTracker trySend(Message<?> message) throws ClientException {
         checkClosedOrFailed();
-        return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, false);
+        return sendMessage(ClientMessageSupport.convertMessage(message), null, false);
     }
 
     @Override
     public StreamTracker trySend(Message<?> message, Map<String, Object> deliveryAnnotations) throws ClientException {
         checkClosedOrFailed();
-        return (StreamTracker) sendMessage(ClientMessageSupport.convertMessage(message), null, false);
+        return sendMessage(ClientMessageSupport.convertMessage(message), null, false);
     }
 
     @Override
@@ -106,20 +127,77 @@ public final class ClientStreamSender extends ClientSender implements StreamSend
 
     //----- Internal API
 
-    @Override
+    boolean isSendingSettled() {
+        return sendsSettled;
+    }
+
     StreamSenderOptions options() {
         return this.options;
     }
 
     @Override
-    ClientStreamSender open() {
-        return (ClientStreamSender) super.open();
+    protected StreamSender self() {
+        return this;
+    }
+
+    Sender getProtonSender() {
+        return protonSender;
+    }
+
+    @Override
+    protected Sender protonLink() {
+        return protonSender;
+    }
+
+    private void addToTailOfBlockedQueue(ClientOutgoingEnvelope send) {
+        if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
+            send.sendTimeout(executor.schedule(() -> {
+                send.failed(send.createSendTimedOutException());
+            }, options.sendTimeout(), TimeUnit.MILLISECONDS));
+        }
+
+        blocked.addLast(send);
+    }
+
+    private void addToHeadOfBlockedQueue(ClientOutgoingEnvelope send) {
+        if (options.sendTimeout() > 0 && send.sendTimeout() == null) {
+            send.sendTimeout(executor.schedule(() -> {
+                send.failed(send.createSendTimedOutException());
+            }, options.sendTimeout(), TimeUnit.MILLISECONDS));
+        }
+
+        blocked.addFirst(send);
+    }
+
+    private StreamTracker sendMessage(AdvancedMessage<?> message, Map<String, Object> deliveryAnnotations, boolean waitForCredit) throws ClientException {
+        final ClientFuture<StreamTracker> operation = session.getFutureFactory().createFuture();
+        final ProtonBuffer buffer = message.encode(deliveryAnnotations);
+
+        executor.execute(() -> {
+            if (notClosedOrFailed(operation)) {
+                try {
+                    final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, null, message.messageFormat(), buffer, true, operation);
+
+                    if (protonSender.isSendable() && protonSender.current() == null) {
+                        session.getTransactionContext().send(envelope, null, protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED);
+                    } else if (waitForCredit) {
+                        addToTailOfBlockedQueue(envelope);
+                    } else {
+                        operation.complete(null);
+                    }
+                } catch (Exception error) {
+                    operation.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
+                }
+            }
+        });
+
+        return session.request(this, operation);
     }
 
     StreamTracker sendMessage(ClientStreamSenderMessage context, ProtonBuffer payload, int messageFormat) throws ClientException {
         checkClosedOrFailed();
 
-        final ClientFuture<Tracker> operation = session.getFutureFactory().createFuture();
+        final ClientFuture<StreamTracker> operation = session.getFutureFactory().createFuture();
         final ProtonBuffer buffer = payload;
         final ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(
             this, context.getProtonDelivery(), messageFormat, buffer, context.completed(), operation);
@@ -138,16 +216,343 @@ public final class ClientStreamSender extends ClientSender implements StreamSend
             }
         });
 
-        return (StreamTracker) session.request(this, operation);
+        return session.request(this, operation);
     }
 
-    @Override
-    protected ClientStreamTracker createTracker(OutgoingDelivery delivery) {
+    private ClientStreamTracker createTracker(OutgoingDelivery delivery) {
         return new ClientStreamTracker(this, delivery);
     }
 
-    @Override
-    protected ClientNoOpStreamTracker createNoOpTracker() {
+    private ClientNoOpStreamTracker createNoOpTracker() {
         return new ClientNoOpStreamTracker(this);
     }
+
+    void disposition(OutgoingDelivery delivery, DeliveryState state, boolean settled) throws ClientException {
+        checkClosedOrFailed();
+        executor.execute(() -> {
+            delivery.disposition(state, settled);
+        });
+    }
+
+    void abort(OutgoingDelivery delivery, ClientStreamTracker tracker) throws ClientException {
+        checkClosedOrFailed();
+        ClientFuture<StreamTracker> request = session().getFutureFactory().createFuture(new ClientSynchronization<StreamTracker>() {
+
+            @Override
+            public void onPendingSuccess(StreamTracker result) {
+                handleCreditStateUpdated(protonLink());
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+                handleCreditStateUpdated(protonLink());
+            }
+        });
+
+        executor.execute(() -> {
+            if (delivery.getTransferCount() == 0) {
+                delivery.abort();
+                request.complete(tracker);
+            } else {
+                ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, delivery, delivery.getMessageFormat(), null, false, request).abort();
+                try {
+                    if (protonSender.isSendable() && (protonSender.current() == null || protonSender.current() == delivery)) {
+                        envelope.send(delivery.getState(), delivery.isSettled());
+                    } else {
+                        if (protonSender.current() == delivery) {
+                            addToHeadOfBlockedQueue(envelope);
+                        } else {
+                            addToTailOfBlockedQueue(envelope);
+                        }
+                    }
+                } catch (Exception error) {
+                    request.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
+                }
+            }
+        });
+
+        session.request(this, request);
+    }
+
+    void complete(OutgoingDelivery delivery, ClientStreamTracker tracker) throws ClientException {
+        checkClosedOrFailed();
+        ClientFuture<StreamTracker> request = session().getFutureFactory().createFuture(new ClientSynchronization<StreamTracker>() {
+
+            @Override
+            public void onPendingSuccess(StreamTracker result) {
+                handleCreditStateUpdated(protonLink());
+            }
+
+            @Override
+            public void onPendingFailure(Throwable cause) {
+                handleCreditStateUpdated(protonLink());
+            }
+        });
+
+        executor.execute(() -> {
+            ClientOutgoingEnvelope envelope = new ClientOutgoingEnvelope(this, delivery, delivery.getMessageFormat(), null, true, request);
+            try {
+                if (protonSender.isSendable() && (protonSender.current() == null || protonSender.current() == delivery)) {
+                    envelope.send(delivery.getState(), delivery.isSettled());
+                } else {
+                    if (protonSender.current() == delivery) {
+                        addToHeadOfBlockedQueue(envelope);
+                    } else {
+                        addToTailOfBlockedQueue(envelope);
+                    }
+                }
+            } catch (Exception error) {
+                request.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
+            }
+        });
+
+        session.request(this, request);
+    }
+
+    //----- Handlers for proton receiver events
+
+    private void handleCreditStateUpdated(org.apache.qpid.protonj2.engine.Sender sender) {
+        if (!blocked.isEmpty()) {
+            while (sender.isSendable() && !blocked.isEmpty()) {
+                ClientOutgoingEnvelope held = blocked.peek();
+                if (held.delivery() == protonSender.current()) {
+                    LOG.trace("Dispatching previously held send");
+                    try {
+                        // We don't currently allow a sender to define any outcome so we pass null for
+                        // now, however a transaction context will apply its TransactionalState outcome
+                        // and would wrap anything we passed in the future.
+                        session.getTransactionContext().send(held, null, isSendingSettled());
+                    } catch (Exception error) {
+                        held.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error));
+                    } finally {
+                        blocked.poll();
+                    }
+                } else {
+                    break;
+                }
+            }
+        }
+
+        if (sender.isDraining() && sender.current() == null && blocked.isEmpty()) {
+            sender.drained();
+        }
+    }
+
+    @Override
+    protected void linkSpecificLocalOpenHandler() {
+        protonSender.creditStateUpdateHandler(this::handleCreditStateUpdated);
+    }
+
+    @Override
+    protected void recreateLinkForReconnect() {
+        protonSender.localCloseHandler(null);
+        protonSender.localDetachHandler(null);
+        protonSender.close();
+        if (protonSender.hasUnsettled()) {
+            failPendingUnsettledAndBlockedSends(
+                new ClientConnectionRemotelyClosedException("Connection failed and send result is unknown"));
+        }
+        protonSender = ClientSenderBuilder.recreateSender(session, protonSender, options);
+        protonSender.setLinkedResource(this);
+    }
+
+    @Override
+    protected void linkSpecificCleanupHandler(ClientException failureCause) {
+        // If the parent of this sender is a stream session than this sender owns it
+        // and must close it when it closes itself to ensure that the resources are
+        // cleaned up on the remote for the session.
+        if (session instanceof ClientStreamSession) {
+            session.closeAsync();
+        }
+
+        if (failureCause != null) {
+            failPendingUnsettledAndBlockedSends(failureCause);
+        } else {
+            failPendingUnsettledAndBlockedSends(new ClientResourceRemotelyClosedException("The sender link has closed"));
+        }
+    }
+
+    private void failPendingUnsettledAndBlockedSends(ClientException cause) {
+        // Cancel all settlement futures for in-flight sends passing an appropriate error to the future
+        protonSender.unsettled().forEach((delivery) -> {
+            try {
+                final ClientTracker tracker = delivery.getLinkedResource();
+                tracker.settlementFuture().failed(cause);
+            } catch (Exception e) {
+            }
+        });
+
+        // Cancel all blocked sends passing an appropriate error to the future
+        blocked.removeIf((held) -> {
+            held.failed(cause);
+            return true;
+        });
+    }
+
+    @Override
+    protected void linkSpecificLocalCloseHandler() {
+        // Nothing needed for sender handling
+    }
+
+    @Override
+    protected void linkSpecificRemoteOpenHandler() {
+        // Nothing needed for sender handling
+    }
+
+    @Override
+    protected void linkSpecificRemoteCloseHandler() {
+        // Nothing needed for sender handling
+    }
+
+    //----- Internal envelope for deliveries to track potential partial sends etc.
+
+    public static final class ClientOutgoingEnvelope implements ClientTransactionContext.Sendable {
+
+        private final ProtonBuffer payload;
+        private final ClientFuture<StreamTracker> request;
+        private final ClientStreamSender sender;
+        private final boolean complete;
+        private final int messageFormat;
+
+        private boolean aborted;
+        private ScheduledFuture<?> sendTimeout;
+        private OutgoingDelivery delivery;
+
+        /**
+         * Create a new In-flight Send instance that is a continuation on an existing delivery.
+         *
+         * @param sender
+         *      The {@link ClientSender} instance that is attempting to send this encoded message.
+         * @param messageFormat
+         *      The message format code to assign the send if this is the first delivery.
+         * @param delivery
+         *      The {@link OutgoingDelivery} context this envelope will be added to.
+         * @param payload
+         *      The payload that comprises this portion of the send.
+         * @param complete
+         *      Indicates if the encoded payload represents the complete transfer or if more is coming.
+         * @param request
+         *      The requesting operation that initiated this send.
+         */
+        public ClientOutgoingEnvelope(ClientStreamSender sender, OutgoingDelivery delivery, int messageFormat, ProtonBuffer payload, boolean complete, ClientFuture<StreamTracker> request) {
+            this.payload = payload;
+            this.request = request;
+            this.sender = sender;
+            this.complete = complete;
+            this.messageFormat = messageFormat;
+            this.delivery = delivery;
+        }
+
+        /**
+         * @return the {@link ScheduledFuture} used to determine when the send should fail if no credit available to write.
+         */
+        public ScheduledFuture<?> sendTimeout() {
+            return sendTimeout;
+        }
+
+        /**
+         * Sets the {@link ScheduledFuture} which should be used when a send cannot be immediately performed.
+         *
+         * @param sendTimeout
+         * 		The {@link ScheduledFuture} that will fail the send if not cancelled once it has been performed.
+         */
+        public void sendTimeout(ScheduledFuture<?> sendTimeout) {
+            this.sendTimeout = sendTimeout;
+        }
+
+        public ProtonBuffer payload() {
+            return payload;
+        }
+
+        public OutgoingDelivery delivery() {
+            return delivery;
+        }
+
+        public ClientOutgoingEnvelope abort() {
+            this.aborted = true;
+            return this;
+        }
+
+        public boolean aborted() {
+            return aborted;
+        }
+
+        @Override
+        public void discard() {
+            if (sendTimeout != null) {
+                sendTimeout.cancel(true);
+                sendTimeout = null;
+            }
+
+            if (delivery != null) {
+                ClientTracker tracker = delivery.getLinkedResource();
+                if (tracker != null) {
+                    tracker.settlementFuture().complete(tracker);
+                }
+                request.complete(delivery.getLinkedResource());
+            } else {
+                request.complete(sender.createNoOpTracker());
+            }
+        }
+
+        public ClientOutgoingEnvelope succeeded() {
+            if (sendTimeout != null) {
+                sendTimeout.cancel(true);
+            }
+
+            request.complete(delivery.getLinkedResource());
+
+            return this;
+        }
+
+        public ClientOutgoingEnvelope failed(ClientException exception) {
+            if (sendTimeout != null) {
+                sendTimeout.cancel(true);
+            }
+
+            request.failed(exception);
+
+            return this;
+        }
+
+        @Override
+        public void send(DeliveryState state, boolean settled) {
+            if (delivery == null) {
+                delivery = sender.protonLink().next();
+                delivery.setLinkedResource(sender.createTracker(delivery));
+            }
+
+            if (delivery.getTransferCount() == 0) {
+                delivery.setMessageFormat(messageFormat);
+                delivery.disposition(state, settled);
+            }
+
+            // We must check if the delivery was fully written and then complete the send operation otherwise
+            // if the session capacity limited the amount of payload data we need to hold the completion until
+            // the session capacity is refilled and we can fully write the remaining message payload.  This
+            // area could use some enhancement to allow control of write and flush when dealing with delivery
+            // modes that have low assurance versus those that are strict.
+            if (aborted()) {
+                delivery.abort();
+                succeeded();
+            } else {
+                sender.connection().autoFlushOff();
+                try {
+                    delivery.streamBytes(payload, complete);
+                    if (payload != null && payload.isReadable()) {
+                        sender.addToHeadOfBlockedQueue(this);
+                    } else {
+                        succeeded();
+                    }
+                    sender.connection().flush();
+                } finally {
+                    sender.connection().autoFlushOn();
+                }
+            }
+        }
+
+        public ClientException createSendTimedOutException() {
+            return new ClientSendTimedOutException("Timed out waiting for credit to send");
+        }
+    }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java
index 07787553..4906ea0c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSenderMessage.java
@@ -98,7 +98,7 @@ final class ClientStreamSenderMessage implements StreamSenderMessage {
             writeBufferSize = Math.max(StreamSenderOptions.MIN_BUFFER_SIZE_LIMIT, sender.options().writeBufferSize());
         } else {
             writeBufferSize = Math.max(StreamSenderOptions.MIN_BUFFER_SIZE_LIMIT,
-                                  (int) sender.getProtonSender().getConnection().getMaxFrameSize());
+                                  (int) sender.protonLink().getConnection().getMaxFrameSize());
         }
     }
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
index c36cc36e..0d5c3736 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamTracker.java
@@ -16,46 +16,192 @@
  */
 package org.apache.qpid.protonj2.client.impl;
 
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.qpid.protonj2.client.DeliveryState;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamTracker;
+import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
+import org.apache.qpid.protonj2.client.futures.ClientFuture;
 import org.apache.qpid.protonj2.engine.OutgoingDelivery;
 
 /**
  * {@link StreamTracker} implementation that relies on the ClientTracker to handle the
  * basic {@link OutgoingDelivery} management.
  */
-public final class ClientStreamTracker extends ClientTracker implements StreamTracker {
+public final class ClientStreamTracker implements StreamTracker {
+
+    private final ClientStreamSender sender;
+    private final OutgoingDelivery delivery;
+
+    private final ClientFuture<StreamTracker> remoteSettlementFuture;
+
+    private volatile boolean remotelySettled;
+    private volatile DeliveryState remoteDeliveryState;
 
     ClientStreamTracker(ClientStreamSender sender, OutgoingDelivery delivery) {
-        super(sender, delivery);
+        this.sender = sender;
+        this.delivery = delivery;
+        this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
+        this.remoteSettlementFuture = sender.session().getFutureFactory().createFuture();
+    }
+
+    OutgoingDelivery delivery() {
+        return delivery;
     }
 
     @Override
     public StreamSender sender() {
-        return (StreamSender) super.sender();
+        return sender;
+    }
+
+    @Override
+    public synchronized DeliveryState state() {
+        return ClientDeliveryState.fromProtonType(delivery.getState());
+    }
+
+    @Override
+    public DeliveryState remoteState() {
+        return remoteDeliveryState;
+    }
+
+    @Override
+    public boolean remoteSettled() {
+        return remotelySettled;
     }
 
     @Override
     public StreamTracker disposition(DeliveryState state, boolean settle) throws ClientException {
-        return (StreamTracker) super.disposition(state, settle);
+        try {
+            sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
+        } finally {
+            if (settle) {
+                remoteSettlementFuture.complete(this);
+            }
+        }
+
+        return this;
     }
 
     @Override
     public StreamTracker settle() throws ClientException {
-        return (StreamTracker) super.settle();
+        try {
+            sender.disposition(delivery, null, true);
+        } finally {
+            remoteSettlementFuture.complete(this);
+        }
+
+        return this;
+    }
+
+    @Override
+    public synchronized boolean settled() {
+        return delivery.isSettled();
+    }
+
+    @Override
+    public ClientFuture<StreamTracker> settlementFuture() {
+        if (delivery.isSettled()) {
+            remoteSettlementFuture.complete(this);
+        }
+
+        return remoteSettlementFuture;
     }
 
     @Override
     public StreamTracker awaitSettlement() throws ClientException {
-        return (StreamTracker) super.awaitSettlement();
+        try {
+            if (settled()) {
+                return this;
+            } else {
+                return settlementFuture().get();
+            }
+        } catch (ExecutionException exe) {
+            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw new ClientException("Wait for settlement was interrupted", e);
+        }
     }
 
     @Override
     public StreamTracker awaitSettlement(long timeout, TimeUnit unit) throws ClientException {
-        return (StreamTracker) super.awaitSettlement(timeout, unit);
+        try {
+            if (settled()) {
+                return this;
+            } else {
+                return settlementFuture().get(timeout, unit);
+            }
+        } catch (InterruptedException ie) {
+            Thread.interrupted();
+            throw new ClientException("Wait for settlement was interrupted", ie);
+        } catch (ExecutionException exe) {
+            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
+        } catch (TimeoutException te) {
+            throw new ClientOperationTimedOutException("Timed out waiting for remote settlement", te);
+        }
+    }
+
+    @Override
+    public StreamTracker awaitAccepted() throws ClientException {
+        try {
+            if (settled() && !remoteSettled()) {
+                return this;
+            } else {
+                settlementFuture().get();
+                if (remoteState() != null && remoteState().isAccepted()) {
+                    return this;
+                } else {
+                    throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
+                }
+            }
+        } catch (ExecutionException exe) {
+            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
+        } catch (InterruptedException ie) {
+            Thread.interrupted();
+            throw new ClientException("Wait for Accepted outcome was interrupted", ie);
+        }
+    }
+
+    @Override
+    public StreamTracker awaitAccepted(long timeout, TimeUnit unit) throws ClientException {
+        try {
+            if (settled() && !remoteSettled()) {
+                return this;
+            } else {
+                settlementFuture().get(timeout, unit);
+                if (remoteState() != null && remoteState().isAccepted()) {
+                    return this;
+                } else {
+                    throw new ClientDeliveryStateException("Remote did not accept the sent message", remoteState());
+                }
+            }
+        } catch (InterruptedException ie) {
+            Thread.interrupted();
+            throw new ClientException("Wait for Accepted outcome was interrupted", ie);
+        } catch (ExecutionException exe) {
+            throw ClientExceptionSupport.createNonFatalOrPassthrough(exe.getCause());
+        } catch (TimeoutException te) {
+            throw new ClientOperationTimedOutException("Timed out waiting for remote Accepted outcome", te);
+        }
+    }
+
+    //----- Internal Event hooks for delivery updates
+
+    private void processDeliveryUpdated(OutgoingDelivery delivery) {
+        remotelySettled = delivery.isRemotelySettled();
+        remoteDeliveryState = ClientDeliveryState.fromProtonType(delivery.getRemoteState());
+
+        if (delivery.isRemotelySettled()) {
+            remoteSettlementFuture.complete(this);
+        }
+
+        if (sender.options().autoSettle() && delivery.isRemotelySettled()) {
+            delivery.settle();
+        }
     }
 }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
index be7c8c7d..0ca1bf38 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransactionContext.java
@@ -30,6 +30,31 @@ import org.apache.qpid.protonj2.types.transport.DeliveryState;
  */
 public interface ClientTransactionContext {
 
+    public interface Sendable {
+
+        /**
+         * Performs the actual send of delivery data which might be enlisted in a transaction
+         * or may simply be a passed through based on the context and its state. The sender need
+         * not be aware of this though as the context will provide a delivery state that is
+         * appropriate for this send which would encapsulate any sender provided delivery state.
+         *
+         * @param state
+         * 		Sender provided delivery state or context decorated version.
+         * @param settled
+         * 		If the send should be sent settled or not.
+         */
+        void send(DeliveryState state, boolean settled);
+
+        /**
+         * If the context that is overseeing this send is in a failed state it can request that
+         * the send be discarded without notification to the sender that it failed, this occurs
+         * most often in an in-doubt transaction context where all work will be dropped once the
+         * user attempt to retire the transaction.
+         */
+        void discard();
+
+    }
+
     /**
      * Begin a new transaction if one is not already in play.
      *
@@ -86,7 +111,7 @@ public interface ClientTransactionContext {
      * in a roll-back only state.  If the transaction is failed the context should discard the
      * envelope which should appear to the caller as if the send was successful.
      *
-     * @param envelope
+     * @param sendable
      *      The envelope containing the details and mechanisms for sending the message.
      * @param state
      *      The delivery state that is being applied as the outcome of the delivery.
@@ -95,7 +120,7 @@ public interface ClientTransactionContext {
      *
      * @return this {@link ClientTransactionContext} instance.
      */
-    ClientTransactionContext send(ClientOutgoingEnvelope envelope, DeliveryState state, boolean settled);
+    ClientTransactionContext send(Sendable sendable, DeliveryState state, boolean settled);
 
     /**
      * Apply a disposition to the given delivery wrapping it with a {@link TransactionalState} outcome
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
index d9ebdbd5..b946fcca 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/SenderTest.java
@@ -910,7 +910,7 @@ public class SenderTest extends ImperativeClientTestCase {
             }
 
             assertNotNull(tracker);
-            assertNotNull(tracker.settlementFuture().get(5, TimeUnit.SECONDS));
+            assertTrue(tracker.settlementFuture().get(5, TimeUnit.SECONDS).remoteSettled());
             assertEquals(tracker.remoteState().getType(), DeliveryState.Type.ACCEPTED);
             assertNull(tracker.state());
             assertFalse(tracker.settled());
@@ -977,8 +977,9 @@ public class SenderTest extends ImperativeClientTestCase {
             }
 
             assertNotNull(tracker);
-            assertNotNull(tracker.settlementFuture().isDone());
-            assertNotNull(tracker.settlementFuture().get().settled());
+            assertTrue(tracker.settlementFuture().isDone());
+            assertTrue(tracker.settlementFuture().get().settled());
+            assertFalse(tracker.settlementFuture().get().remoteSettled());
 
             sender.closeAsync().get(10, TimeUnit.SECONDS);
 
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
index d70ec7c1..2ca78f0a 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamReceiverTest.java
@@ -49,7 +49,6 @@ import org.apache.qpid.protonj2.client.Connection;
 import org.apache.qpid.protonj2.client.ConnectionOptions;
 import org.apache.qpid.protonj2.client.DeliveryState;
 import org.apache.qpid.protonj2.client.ErrorCondition;
-import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.ReceiverOptions;
 import org.apache.qpid.protonj2.client.SenderOptions;
 import org.apache.qpid.protonj2.client.StreamDelivery;
@@ -3288,7 +3287,7 @@ class StreamReceiverTest extends ImperativeClientTestCase {
             Client container = Client.create();
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
             StreamReceiverOptions receiverOptions = new StreamReceiverOptions().drainTimeout(15);
-            Receiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();
+            StreamReceiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();
 
             try {
                 receiver.drain().get();
@@ -3323,7 +3322,7 @@ class StreamReceiverTest extends ImperativeClientTestCase {
             Client container = Client.create();
             ConnectionOptions connectionOptions = new ConnectionOptions().drainTimeout(20);
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectionOptions);
-            Receiver receiver = connection.openStreamReceiver("test-queue").openFuture().get();
+            StreamReceiver receiver = connection.openStreamReceiver("test-queue").openFuture().get();
 
             try {
                 receiver.drain().get();
@@ -3354,12 +3353,12 @@ class StreamReceiverTest extends ImperativeClientTestCase {
 
             Client container = Client.create();
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
-            Receiver receiver = connection.openStreamReceiver("test-queue", new StreamReceiverOptions().creditWindow(0));
+            StreamReceiver receiver = connection.openStreamReceiver("test-queue", new StreamReceiverOptions().creditWindow(0));
             receiver.openFuture().get(5, TimeUnit.SECONDS);
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
 
-            Future<? extends Receiver> draining = receiver.drain();
+            Future<StreamReceiver> draining = receiver.drain();
             draining.get(5, TimeUnit.SECONDS);
 
             // Close things down
@@ -3389,7 +3388,7 @@ class StreamReceiverTest extends ImperativeClientTestCase {
             Client container = Client.create();
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
             StreamReceiverOptions receiverOptions = new StreamReceiverOptions();
-            Receiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();
+            StreamReceiver receiver = connection.openStreamReceiver("test-queue", receiverOptions).openFuture().get();
 
             receiver.drain();
 
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 25b564af..89969162 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -56,11 +56,12 @@ import org.apache.qpid.protonj2.client.Session;
 import org.apache.qpid.protonj2.client.StreamSender;
 import org.apache.qpid.protonj2.client.StreamSenderMessage;
 import org.apache.qpid.protonj2.client.StreamSenderOptions;
-import org.apache.qpid.protonj2.client.Tracker;
+import org.apache.qpid.protonj2.client.StreamTracker;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException;
 import org.apache.qpid.protonj2.client.test.ImperativeClientTestCase;
+import org.apache.qpid.protonj2.client.test.Wait;
 import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher;
 import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher;
@@ -162,7 +163,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
 
             Message<String> message = Message.create("Hello World");
 
-            final Tracker tracker;
+            final StreamTracker tracker;
             if (trySend) {
                 if (addDeliveryAnnotations) {
                     tracker = sender.trySend(message, deliveryAnnotations);
@@ -376,8 +377,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
 
             assertNotNull(message.tracker());
             assertEquals(17, message.messageFormat());
-            assertNotNull(message.tracker().settlementFuture().isDone());
-            assertNotNull(message.tracker().settlementFuture().get().settled());
+            Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
+            assertTrue(message.tracker().settlementFuture().get().settled());
             assertThrows(ClientIllegalStateException.class, () -> message.addBodySection(new AmqpValue<>("three")));
             assertThrows(ClientIllegalStateException.class, () -> message.body());
             assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
@@ -448,8 +449,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
 
             message.complete();
 
-            assertNotNull(message.tracker().settlementFuture().isDone());
-            assertNotNull(message.tracker().settlementFuture().get().settled());
+            assertTrue(message.tracker().settlementFuture().isDone());
+            assertTrue(message.tracker().settlementFuture().get().settled());
             assertThrows(ClientIllegalStateException.class, () -> message.body());
             assertThrows(ClientIllegalStateException.class, () -> message.rawOutputStream());
 
@@ -530,7 +531,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             Client container = Client.create();
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
 
-            StreamSender sender = (StreamSender) connection.openStreamSender("test-qos").openFuture().get();
+            StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
             StreamSenderMessage message = sender.beginMessage();
 
             try {
@@ -571,7 +572,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             Client container = Client.create();
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
 
-            StreamSender sender = (StreamSender) connection.openStreamSender("test-qos").openFuture().get();
+            StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
             StreamSenderMessage message = sender.beginMessage();
 
             try {
@@ -612,7 +613,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             Client container = Client.create();
             Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort()).openFuture().get();
 
-            StreamSender sender = (StreamSender) connection.openStreamSender("test-qos").openFuture().get();
+            StreamSender sender = connection.openStreamSender("test-qos").openFuture().get();
             StreamSenderMessage message = sender.beginMessage();
 
             message.durable(true);
@@ -1658,8 +1659,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.expectClose().respond();
 
             assertNotNull(message.tracker());
-            assertNotNull(message.tracker().settlementFuture().isDone());
-            assertNotNull(message.tracker().settlementFuture().get().settled());
+            assertFalse(message.tracker().settlementFuture().isDone());
+            assertTrue(message.tracker().settlementFuture().get().settled());
 
             sender.closeAsync().get(10, TimeUnit.SECONDS);
 
@@ -1750,8 +1751,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.expectClose().respond();
 
             assertNotNull(message.tracker());
-            assertNotNull(message.tracker().settlementFuture().isDone());
-            assertNotNull(message.tracker().settlementFuture().get().settled());
+            assertFalse(message.tracker().settlementFuture().isDone());
+            assertTrue(message.tracker().settlementFuture().get().settled());
 
             sender.closeAsync().get(10, TimeUnit.SECONDS);
 
@@ -1841,8 +1842,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.expectClose().respond();
 
             assertNotNull(message.tracker());
-            assertNotNull(message.tracker().settlementFuture().isDone());
-            assertNotNull(message.tracker().settlementFuture().get().settled());
+            Wait.assertTrue(() -> message.tracker().settlementFuture().isDone());
+            assertTrue(message.tracker().settlementFuture().get().settled());
 
             sender.closeAsync().get(10, TimeUnit.SECONDS);
 
@@ -1946,7 +1947,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             ForkJoinPool.commonPool().execute(() -> {
                 try {
                     LOG.info("Test send 1 is preparing to fire:");
-                    Tracker tracker = sender.send(Message.create(payload));
+                    StreamTracker tracker = sender.send(Message.create(payload));
                     tracker.awaitSettlement(10, TimeUnit.SECONDS);
                 } catch (Exception e) {
                     LOG.info("Test send 1 failed with error: ", e);
@@ -1957,7 +1958,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             ForkJoinPool.commonPool().execute(() -> {
                 try {
                     LOG.info("Test send 2 is preparing to fire:");
-                    Tracker tracker = sender.send(Message.create(payload));
+                    StreamTracker tracker = sender.send(Message.create(payload));
                     tracker.awaitSettlement(10, TimeUnit.SECONDS);
                 } catch (Exception e) {
                     LOG.info("Test send 2 failed with error: ", e);
@@ -2018,7 +2019,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
                 try {
                     assertTrue(send1Started.await(10, TimeUnit.SECONDS));
                     LOG.info("Test send 2 is preparing to fire:");
-                    Tracker tracker = sender.send(Message.create(payload));
+                    StreamTracker tracker = sender.send(Message.create(payload));
                     tracker.awaitSettlement(10, TimeUnit.SECONDS);
                     send2Completed.countDown();
                 } catch (Exception e) {
@@ -2089,7 +2090,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
                 try {
                     assertTrue(send1Started.await(10, TimeUnit.SECONDS));
                     LOG.info("Test send 2 is preparing to fire:");
-                    Tracker tracker = sender.send(Message.create(payload));
+                    StreamTracker tracker = sender.send(Message.create(payload));
                     tracker.awaitSettlement(10, TimeUnit.SECONDS);
                     send2Completed.countDown();
                 } catch (Exception e) {
@@ -2777,8 +2778,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             message.complete();
 
             assertEquals(message, message.complete()); // Should no-op at this point
-            assertNotNull(message.tracker().settlementFuture().isDone());
-            assertNotNull(message.tracker().settlementFuture().get().settled());
+            Wait.assertTrue(() ->message.tracker().settlementFuture().isDone());
+            assertTrue(message.tracker().settlementFuture().get().settled());
 
             sender.closeAsync().get(10, TimeUnit.SECONDS);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-protonj2] 02/02: PROTON-2533 Update netty tc-natives to 2.0.52.Final

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git

commit ce2f829480b8a4394c632a513d1794aa41cb628a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Fri May 6 14:45:07 2022 -0400

    PROTON-2533 Update netty tc-natives to 2.0.52.Final
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 54aa10d0..fcf22961 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,7 +45,7 @@
     <hamcrest.version>2.2</hamcrest.version>
     <netty.version>4.1.76.Final</netty.version>
     <netty.iouring.version>0.0.13.Final</netty.iouring.version>
-    <netty.tcnative.version>2.0.51.Final</netty.tcnative.version>
+    <netty.tcnative.version>2.0.52.Final</netty.tcnative.version>
 
     <!-- Plugin versions -->
     <maven.bundle.plugin.version>5.1.1</maven.bundle.plugin.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org