You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/05/19 20:03:02 UTC
[qpid-protonj2] branch main updated: PROTON-2541 Provide a common base for delivery types
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new fd25cf47 PROTON-2541 Provide a common base for delivery types
fd25cf47 is described below
commit fd25cf47eb6c5355dc7c0b71a8d4e3e1bd2d1f41
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Thu May 19 15:56:47 2022 -0400
PROTON-2541 Provide a common base for delivery types
pulls up the common bits of delivery types into a utility base type.
---
.../protonj2/client/impl/ClientDeliverable.java | 100 +++++++++++++++++++++
.../qpid/protonj2/client/impl/ClientDelivery.java | 95 +++-----------------
.../protonj2/client/impl/ClientStreamDelivery.java | 75 ++--------------
.../client/impl/ClientStreamReceiverMessage.java | 2 +-
4 files changed, 120 insertions(+), 152 deletions(-)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliverable.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliverable.java
new file mode 100644
index 00000000..88c95066
--- /dev/null
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDeliverable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.client.impl;
+
+import org.apache.qpid.protonj2.client.DeliveryState;
+import org.apache.qpid.protonj2.client.exceptions.ClientException;
+import org.apache.qpid.protonj2.engine.IncomingDelivery;
+import org.apache.qpid.protonj2.types.messaging.Accepted;
+import org.apache.qpid.protonj2.types.messaging.Modified;
+import org.apache.qpid.protonj2.types.messaging.Rejected;
+import org.apache.qpid.protonj2.types.messaging.Released;
+import org.apache.qpid.protonj2.types.transport.ErrorCondition;
+
+/**
+ * Abstract type that implements some of the common portions of a delivery
+ * wrapper type.
+ */
+@SuppressWarnings("rawtypes")
+public abstract class ClientDeliverable<DeliveryType, ReceiverType extends ClientReceiverLinkType> {
+
+ protected final ReceiverType receiver;
+ protected final IncomingDelivery delivery;
+
+ ClientDeliverable(ReceiverType receiver, IncomingDelivery delivery) {
+ this.receiver = receiver;
+ this.delivery = delivery;
+ this.delivery.setLinkedResource(self());
+ }
+
+ protected abstract DeliveryType self();
+
+ IncomingDelivery protonDelivery() {
+ return delivery;
+ }
+
+ public DeliveryType accept() throws ClientException {
+ receiver.disposition(delivery, Accepted.getInstance(), true);
+ return self();
+ }
+
+ public DeliveryType release() throws ClientException {
+ receiver.disposition(delivery, Released.getInstance(), true);
+ return self();
+ }
+
+ public DeliveryType reject(String condition, String description) throws ClientException {
+ receiver.disposition(delivery, new Rejected().setError(new ErrorCondition(condition, description)), true);
+ return self();
+ }
+
+ public DeliveryType modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException {
+ receiver.disposition(delivery, new Modified().setDeliveryFailed(deliveryFailed).setUndeliverableHere(undeliverableHere), true);
+ return self();
+ }
+
+ public DeliveryType disposition(DeliveryState state, boolean settle) throws ClientException {
+ receiver.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
+ return self();
+ }
+
+ public DeliveryType settle() throws ClientException {
+ receiver.disposition(delivery, null, true);
+ return self();
+ }
+
+ public DeliveryState state() {
+ return ClientDeliveryState.fromProtonType(delivery.getState());
+ }
+
+ public DeliveryState remoteState() {
+ return ClientDeliveryState.fromProtonType(delivery.getRemoteState());
+ }
+
+ public boolean remoteSettled() {
+ return delivery.isRemotelySettled();
+ }
+
+ public int messageFormat() {
+ return delivery.getMessageFormat();
+ }
+
+ public boolean settled() {
+ return delivery.isSettled();
+ }
+}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
index e790c1b7..0f25a50c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientDelivery.java
@@ -22,27 +22,19 @@ import java.util.Map;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.buffer.ProtonBufferInputStream;
import org.apache.qpid.protonj2.client.Delivery;
-import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.Message;
import org.apache.qpid.protonj2.client.Receiver;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.util.StringUtils;
-import org.apache.qpid.protonj2.types.messaging.Accepted;
import org.apache.qpid.protonj2.types.messaging.DeliveryAnnotations;
-import org.apache.qpid.protonj2.types.messaging.Modified;
-import org.apache.qpid.protonj2.types.messaging.Rejected;
-import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.ErrorCondition;
/**
* Client inbound delivery object.
*/
-public final class ClientDelivery implements Delivery {
+public final class ClientDelivery extends ClientDeliverable<ClientDelivery, ClientReceiver> implements Delivery {
- private final ClientReceiver receiver;
- private final IncomingDelivery delivery;
private final ProtonBuffer payload;
private DeliveryAnnotations deliveryAnnotations;
@@ -59,12 +51,21 @@ public final class ClientDelivery implements Delivery {
* The proton incoming delivery that backs this client delivery facade.
*/
ClientDelivery(ClientReceiver receiver, IncomingDelivery delivery) {
- this.receiver = receiver;
- this.delivery = delivery;
- this.delivery.setLinkedResource(this);
+ super(receiver, delivery);
+
this.payload = delivery.readAll();
}
+ @Override
+ protected ClientDelivery self() {
+ return this;
+ }
+
+ @Override
+ public Receiver receiver() {
+ return receiver;
+ }
+
@SuppressWarnings("unchecked")
@Override
public <E> Message<E> message() throws ClientException {
@@ -104,78 +105,8 @@ public final class ClientDelivery implements Delivery {
}
}
- @Override
- public Delivery accept() throws ClientException {
- receiver.disposition(delivery, Accepted.getInstance(), true);
- return this;
- }
-
- @Override
- public Delivery release() throws ClientException {
- receiver.disposition(delivery, Released.getInstance(), true);
- return this;
- }
-
- @Override
- public Delivery reject(String condition, String description) throws ClientException {
- receiver.disposition(delivery, new Rejected().setError(new ErrorCondition(condition, description)), true);
- return this;
- }
-
- @Override
- public Delivery modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException {
- receiver.disposition(delivery, new Modified().setDeliveryFailed(deliveryFailed).setUndeliverableHere(undeliverableHere), true);
- return this;
- }
-
- @Override
- public Delivery disposition(DeliveryState state, boolean settle) throws ClientException {
- receiver.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
- return this;
- }
-
- @Override
- public Delivery settle() throws ClientException {
- receiver.disposition(delivery, null, true);
- return this;
- }
-
- @Override
- public DeliveryState state() {
- return ClientDeliveryState.fromProtonType(delivery.getState());
- }
-
- @Override
- public DeliveryState remoteState() {
- return ClientDeliveryState.fromProtonType(delivery.getRemoteState());
- }
-
- @Override
- public boolean remoteSettled() {
- return delivery.isRemotelySettled();
- }
-
- @Override
- public int messageFormat() {
- return delivery.getMessageFormat();
- }
-
- @Override
- public Receiver receiver() {
- return receiver;
- }
-
- @Override
- public boolean settled() {
- return delivery.isSettled();
- }
-
//----- Internal API not meant to be used from outside the client package.
- IncomingDelivery protonDelivery() {
- return delivery;
- }
-
void deliveryAnnotations(DeliveryAnnotations deliveryAnnotations) {
this.deliveryAnnotations = deliveryAnnotations;
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
index f6e8158a..bbbfa652 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamDelivery.java
@@ -25,7 +25,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.protonj2.buffer.ProtonCompositeBuffer;
-import org.apache.qpid.protonj2.client.DeliveryState;
import org.apache.qpid.protonj2.client.StreamDelivery;
import org.apache.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
@@ -36,10 +35,6 @@ import org.apache.qpid.protonj2.engine.IncomingDelivery;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.engine.util.StringUtils;
import org.apache.qpid.protonj2.types.messaging.Accepted;
-import org.apache.qpid.protonj2.types.messaging.Modified;
-import org.apache.qpid.protonj2.types.messaging.Rejected;
-import org.apache.qpid.protonj2.types.messaging.Released;
-import org.apache.qpid.protonj2.types.transport.ErrorCondition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +42,7 @@ import org.slf4j.LoggerFactory;
* A {@link StreamDelivery} implementation that provides the mechanics of reading message
* types from an incoming split framed transfer.
*/
-public final class ClientStreamDelivery implements StreamDelivery {
+public final class ClientStreamDelivery extends ClientDeliverable<ClientStreamDelivery, ClientStreamReceiver> implements StreamDelivery {
private static final Logger LOG = LoggerFactory.getLogger(ClientStreamDelivery.class);
@@ -58,6 +53,8 @@ public final class ClientStreamDelivery implements StreamDelivery {
private RawDeliveryInputStream rawInputStream;
ClientStreamDelivery(ClientStreamReceiver receiver, IncomingDelivery protonDelivery) {
+ super(receiver, protonDelivery);
+
this.receiver = receiver;
this.protonDelivery = protonDelivery.setLinkedResource(this);
@@ -69,8 +66,9 @@ public final class ClientStreamDelivery implements StreamDelivery {
.deliveryAbortedHandler(this::handleDeliveryAborted);
}
- IncomingDelivery getProtonDelivery() {
- return protonDelivery;
+ @Override
+ protected ClientStreamDelivery self() {
+ return this;
}
@Override
@@ -88,11 +86,6 @@ public final class ClientStreamDelivery implements StreamDelivery {
return !protonDelivery.isPartial();
}
- @Override
- public int messageFormat() {
- return protonDelivery.getMessageFormat();
- }
-
@Override
public ClientStreamReceiverMessage message() throws ClientException {
if (rawInputStream != null && message == null) {
@@ -128,62 +121,6 @@ public final class ClientStreamDelivery implements StreamDelivery {
return rawInputStream;
}
- @Override
- public StreamDelivery accept() throws ClientException {
- receiver.disposition(protonDelivery, Accepted.getInstance(), true);
- return this;
- }
-
- @Override
- public StreamDelivery release() throws ClientException {
- receiver.disposition(protonDelivery, Released.getInstance(), true);
- return this;
- }
-
- @Override
- public StreamDelivery reject(String condition, String description) throws ClientException {
- receiver.disposition(protonDelivery, new Rejected().setError(new ErrorCondition(condition, description)), true);
- return this;
- }
-
- @Override
- public StreamDelivery modified(boolean deliveryFailed, boolean undeliverableHere) throws ClientException {
- receiver.disposition(protonDelivery, new Modified().setDeliveryFailed(deliveryFailed).setUndeliverableHere(undeliverableHere), true);
- return this;
- }
-
- @Override
- public StreamDelivery disposition(DeliveryState state, boolean settle) throws ClientException {
- receiver.disposition(protonDelivery, ClientDeliveryState.asProtonType(state), settle);
- return this;
- }
-
- @Override
- public StreamDelivery settle() throws ClientException {
- receiver.disposition(protonDelivery, null, true);
- return this;
- }
-
- @Override
- public DeliveryState state() {
- return ClientDeliveryState.fromProtonType(protonDelivery.getState());
- }
-
- @Override
- public boolean settled() {
- return protonDelivery.isSettled();
- }
-
- @Override
- public DeliveryState remoteState() {
- return ClientDeliveryState.fromProtonType(protonDelivery.getRemoteState());
- }
-
- @Override
- public boolean remoteSettled() {
- return protonDelivery.isRemotelySettled();
- }
-
//----- Event Handlers for Delivery updates
void handleDeliveryRead(IncomingDelivery delivery) {
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java
index 269a00a1..ac6a44d3 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamReceiverMessage.java
@@ -98,7 +98,7 @@ public final class ClientStreamReceiverMessage implements StreamReceiverMessage
this.receiver = receiver;
this.delivery = delivery;
this.deliveryStream = deliveryStream;
- this.protonDelivery = delivery.getProtonDelivery();
+ this.protonDelivery = delivery.protonDelivery();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org