You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/19 20:03:02 UTC

[qpid-protonj2] branch main updated: PROTON-2541 Provide a common base for delivery types

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new fd25cf47 PROTON-2541 Provide a common base for delivery types
fd25cf47 is described below

commit fd25cf47eb6c5355dc7c0b71a8d4e3e1bd2d1f41
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Thu May 19 15:56:47 2022 -0400

    PROTON-2541 Provide a common base for delivery types
    
    pulls up the common bits of delivery types into a utility base type.
---
 .../protonj2/client/impl/ClientDeliverable.java    | 100 +++++++++++++++++++++
 .../qpid/protonj2/client/impl/ClientDelivery.java  |  95 +++-----------------
 .../protonj2/client/impl/ClientStreamDelivery.java |  75 ++--------------
 .../client/impl/ClientStreamReceiverMessage.java   |   2 +-
 4 files changed, 120 insertions(+), 152 deletions(-)

diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliverable.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliverable.java
new file mode 100644
index 00000000..88c95066
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliverable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import org.apache.qpid.protonj2.client.DeliveryState;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+import org.apache.qpid.protonj2.types.messaging.Accepted;
+import org.apache.qpid.protonj2.types.messaging.Modified;
+import org.apache.qpid.protonj2.types.messaging.Rejected;
+import org.apache.qpid.protonj2.types.messaging.Released;
+import org.apache.qpid.protonj2.types.transport.ErrorCondition;
+
+/**
+ * Abstract type that implements some of the common portions of a delivery
+ * wrapper type.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class ClientDeliverable<DeliveryType, ReceiverType extends ClientReceiverLinkType> {
+
+    protected final ReceiverType receiver;
+    protected final IncomingDelivery delivery;
+
+    ClientDeliverable(ReceiverType receiver, IncomingDelivery delivery) {
+        this.receiver = receiver;
+        this.delivery = delivery;
+        this.delivery.setLinkedResource(self());
+    }
+
+    protected abstract DeliveryType self();
+
+    IncomingDelivery protonDelivery() {
+        return delivery;
+    }
+
+    public DeliveryType accept() throws ClientException {
+        receiver.disposition(delivery, Accepted.getInstance(), true);
+        return self();
+    }
+
+    public DeliveryType release() throws ClientException {
+        receiver.disposition(delivery, Released.getInstance(), true);
+        return self();
+    }
+
+    public DeliveryType reject(String condition, String description) throws ClientException {
+        receiver.disposition(delivery, new Rejected().setError(new ErrorCondition(condition, description)), true);
+        return self();
+    }
+
+    public DeliveryType modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException {
+        receiver.disposition(delivery, new Modified().setDeliveryFailed(deliveryFailed).setUndeliverableHere(undeliverableHere), true);
+        return self();
+    }
+
+    public DeliveryType disposition(DeliveryState state, boolean settle) throws ClientException {
+        receiver.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
+        return self();
+    }
+
+    public DeliveryType settle() throws ClientException {
+        receiver.disposition(delivery, null, true);
+        return self();
+    }
+
+    public DeliveryState state() {
+        return ClientDeliveryState.fromProtonType(delivery.getState());
+    }
+
+    public DeliveryState remoteState() {
+        return ClientDeliveryState.fromProtonType(delivery.getRemoteState());
+    }
+
+    public boolean remoteSettled() {
+        return delivery.isRemotelySettled();
+    }
+
+    public int messageFormat() {
+        return delivery.getMessageFormat();
+    }
+
+    public boolean settled() {
+        return delivery.isSettled();
+    }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
index e790c1b7..0f25a50c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
@@ -22,27 +22,19 @@ import java.util.Map;
 import org.apache.qpid.protonj2.buffer.ProtonBuffer;
 import org.apache.qpid.protonj2.buffer.ProtonBufferInputStream;
 import org.apache.qpid.protonj2.client.Delivery;
-import org.apache.qpid.protonj2.client.DeliveryState;
 import org.apache.qpid.protonj2.client.Message;
 import org.apache.qpid.protonj2.client.Receiver;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
 import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
 import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.engine.util.StringUtils;
-import org.apache.qpid.protonj2.types.messaging.Accepted;
 import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
-import org.apache.qpid.protonj2.types.messaging.Modified;
-import org.apache.qpid.protonj2.types.messaging.Rejected;
-import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.ErrorCondition;
 
 /**
  * Client inbound delivery object.
  */
-public final class ClientDelivery implements Delivery {
+public final class ClientDelivery extends ClientDeliverable<ClientDelivery, ClientReceiver> implements Delivery {
 
-    private final ClientReceiver receiver;
-    private final IncomingDelivery delivery;
     private final ProtonBuffer payload;
 
     private DeliveryAnnotations deliveryAnnotations;
@@ -59,12 +51,21 @@ public final class ClientDelivery implements Delivery {
      *      The proton incoming delivery that backs this client delivery facade.
      */
     ClientDelivery(ClientReceiver receiver, IncomingDelivery delivery) {
-        this.receiver = receiver;
-        this.delivery = delivery;
-        this.delivery.setLinkedResource(this);
+        super(receiver, delivery);
+
         this.payload = delivery.readAll();
     }
 
+    @Override
+    protected ClientDelivery self() {
+        return this;
+    }
+
+    @Override
+    public Receiver receiver() {
+        return receiver;
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public <E> Message<E> message() throws ClientException {
@@ -104,78 +105,8 @@ public final class ClientDelivery implements Delivery {
         }
     }
 
-    @Override
-    public Delivery accept() throws ClientException {
-        receiver.disposition(delivery, Accepted.getInstance(), true);
-        return this;
-    }
-
-    @Override
-    public Delivery release() throws ClientException {
-        receiver.disposition(delivery, Released.getInstance(), true);
-        return this;
-    }
-
-    @Override
-    public Delivery reject(String condition, String description) throws ClientException {
-        receiver.disposition(delivery, new Rejected().setError(new ErrorCondition(condition, description)), true);
-        return this;
-    }
-
-    @Override
-    public Delivery modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException {
-        receiver.disposition(delivery, new Modified().setDeliveryFailed(deliveryFailed).setUndeliverableHere(undeliverableHere), true);
-        return this;
-    }
-
-    @Override
-    public Delivery disposition(DeliveryState state, boolean settle) throws ClientException {
-        receiver.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
-        return this;
-    }
-
-    @Override
-    public Delivery settle() throws ClientException {
-        receiver.disposition(delivery, null, true);
-        return this;
-    }
-
-    @Override
-    public DeliveryState state() {
-        return ClientDeliveryState.fromProtonType(delivery.getState());
-    }
-
-    @Override
-    public DeliveryState remoteState() {
-        return ClientDeliveryState.fromProtonType(delivery.getRemoteState());
-    }
-
-    @Override
-    public boolean remoteSettled() {
-        return delivery.isRemotelySettled();
-    }
-
-    @Override
-    public int messageFormat() {
-        return delivery.getMessageFormat();
-    }
-
-    @Override
-    public Receiver receiver() {
-        return receiver;
-    }
-
-    @Override
-    public boolean settled() {
-        return delivery.isSettled();
-    }
-
     //----- Internal API not meant to be used from outside the client package.
 
-    IncomingDelivery protonDelivery() {
-        return delivery;
-    }
-
     void deliveryAnnotations(DeliveryAnnotations deliveryAnnotations) {
         this.deliveryAnnotations = deliveryAnnotations;
     }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
index f6e8158a..bbbfa652 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.protonj2.buffer.ProtonCompositeBuffer;
-import org.apache.qpid.protonj2.client.DeliveryState;
 import org.apache.qpid.protonj2.client.StreamDelivery;
 import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
 import org.apache.qpid.protonj2.client.exceptions.ClientException;
@@ -36,10 +35,6 @@ import org.apache.qpid.protonj2.engine.IncomingDelivery;
 import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
 import org.apache.qpid.protonj2.engine.util.StringUtils;
 import org.apache.qpid.protonj2.types.messaging.Accepted;
-import org.apache.qpid.protonj2.types.messaging.Modified;
-import org.apache.qpid.protonj2.types.messaging.Rejected;
-import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.ErrorCondition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +42,7 @@ import org.slf4j.LoggerFactory;
  * A {@link StreamDelivery} implementation that provides the mechanics of reading message
  * types from an incoming split framed transfer.
  */
-public final class ClientStreamDelivery implements StreamDelivery {
+public final class ClientStreamDelivery extends ClientDeliverable<ClientStreamDelivery, ClientStreamReceiver> implements StreamDelivery {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientStreamDelivery.class);
 
@@ -58,6 +53,8 @@ public final class ClientStreamDelivery implements StreamDelivery {
     private RawDeliveryInputStream rawInputStream;
 
     ClientStreamDelivery(ClientStreamReceiver receiver, IncomingDelivery protonDelivery) {
+        super(receiver, protonDelivery);
+
         this.receiver = receiver;
         this.protonDelivery = protonDelivery.setLinkedResource(this);
 
@@ -69,8 +66,9 @@ public final class ClientStreamDelivery implements StreamDelivery {
                       .deliveryAbortedHandler(this::handleDeliveryAborted);
     }
 
-    IncomingDelivery getProtonDelivery() {
-        return protonDelivery;
+    @Override
+    protected ClientStreamDelivery self() {
+        return this;
     }
 
     @Override
@@ -88,11 +86,6 @@ public final class ClientStreamDelivery implements StreamDelivery {
         return !protonDelivery.isPartial();
     }
 
-    @Override
-    public int messageFormat() {
-        return protonDelivery.getMessageFormat();
-    }
-
     @Override
     public ClientStreamReceiverMessage message() throws ClientException {
         if (rawInputStream != null && message == null) {
@@ -128,62 +121,6 @@ public final class ClientStreamDelivery implements StreamDelivery {
         return rawInputStream;
     }
 
-    @Override
-    public StreamDelivery accept() throws ClientException {
-        receiver.disposition(protonDelivery, Accepted.getInstance(), true);
-        return this;
-    }
-
-    @Override
-    public StreamDelivery release() throws ClientException {
-        receiver.disposition(protonDelivery, Released.getInstance(), true);
-        return this;
-    }
-
-    @Override
-    public StreamDelivery reject(String condition, String description) throws ClientException {
-        receiver.disposition(protonDelivery, new Rejected().setError(new ErrorCondition(condition, description)), true);
-        return this;
-    }
-
-    @Override
-    public StreamDelivery modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException {
-        receiver.disposition(protonDelivery, new Modified().setDeliveryFailed(deliveryFailed).setUndeliverableHere(undeliverableHere), true);
-        return this;
-    }
-
-    @Override
-    public StreamDelivery disposition(DeliveryState state, boolean settle) throws ClientException {
-        receiver.disposition(protonDelivery, ClientDeliveryState.asProtonType(state), settle);
-        return this;
-    }
-
-    @Override
-    public StreamDelivery settle() throws ClientException {
-        receiver.disposition(protonDelivery, null, true);
-        return this;
-    }
-
-    @Override
-    public DeliveryState state() {
-        return ClientDeliveryState.fromProtonType(protonDelivery.getState());
-    }
-
-    @Override
-    public boolean settled() {
-        return protonDelivery.isSettled();
-    }
-
-    @Override
-    public DeliveryState remoteState() {
-        return ClientDeliveryState.fromProtonType(protonDelivery.getRemoteState());
-    }
-
-    @Override
-    public boolean remoteSettled() {
-        return protonDelivery.isRemotelySettled();
-    }
-
     //----- Event Handlers for Delivery updates
 
     void handleDeliveryRead(IncomingDelivery delivery) {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java
index 269a00a1..ac6a44d3 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java
@@ -98,7 +98,7 @@ public final class ClientStreamReceiverMessage implements StreamReceiverMessage
         this.receiver = receiver;
         this.delivery = delivery;
         this.deliveryStream = deliveryStream;
-        this.protonDelivery = delivery.getProtonDelivery();
+        this.protonDelivery = delivery.protonDelivery();
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org