You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/11/12 16:35:24 UTC

qpid-jms git commit: QPIDJMS-429 Make use of newer proton-j APIs for send and decode

Repository: qpid-jms
Updated Branches:
  refs/heads/master 7b0aabfa9 -> aa424cab9


QPIDJMS-429 Make use of newer proton-j APIs for send and decode

Use newer APIs to clean up some send code that handles dispositions and
use new section type enums to simplify the message decoding portion of
the codec.  Also implement the writeable buffer ensure remaining method
to better handle writes where the buffer needs to grow.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/aa424cab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/aa424cab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/aa424cab

Branch: refs/heads/master
Commit: aa424cab9b26c39b43fc6b45c53b1ec7e703ef42
Parents: 7b0aabf
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Nov 12 11:31:34 2018 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Nov 12 11:31:39 2018 -0500

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    | 61 +++++++-------
 .../jms/provider/amqp/message/AmqpCodec.java    | 89 ++++++--------------
 .../amqp/message/AmqpWritableBuffer.java        |  5 ++
 3 files changed, 63 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa424cab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 4b2e5b1..c3010d2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -34,13 +34,10 @@ import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.provider.amqp.message.AmqpReadableBuffer;
 import org.apache.qpid.jms.util.IOExceptionSupport;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
@@ -211,54 +208,58 @@ public class AmqpFixedProducer extends AmqpProducer {
     public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
         DeliveryState state = delivery.getRemoteState();
         if (state != null) {
-
             InFlightSend send = (InFlightSend) delivery.getContext();
 
-            if (state instanceof Accepted) {
+            if (state.getType() == DeliveryStateType.Accepted) {
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
                 send.onSuccess();
-                super.processDeliveryUpdates(provider, delivery);
-                return;
+            } else {
+                applyDeliveryStateUpdate(send, delivery, state);
             }
+        }
 
-            Exception deliveryError = null;
-            Outcome outcome = null;
+        super.processDeliveryUpdates(provider, delivery);
+    }
 
-            if (state instanceof TransactionalState) {
-                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
-                outcome = ((TransactionalState) state).getOutcome();
-            } else if (state instanceof Outcome) {
-                outcome = (Outcome) state;
-            } else {
-                LOG.warn("Message send updated with unsupported state: {}", state);
-                outcome = null;
-            }
+    private void applyDeliveryStateUpdate(InFlightSend send, Delivery delivery, DeliveryState state) {
+        Exception deliveryError = null;
+        if (state == null) {
+            return;
+        }
 
-            if (outcome instanceof Accepted) {
+        switch (state.getType()) {
+            case Transactional:
+                LOG.trace("State of delivery is Transactional, retrieving outcome: {}", state);
+                applyDeliveryStateUpdate(send, delivery, (DeliveryState) ((TransactionalState) state).getOutcome());
+                break;
+            case Accepted:
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
                 send.onSuccess();
-            } else if (outcome instanceof Rejected) {
+                break;
+            case Rejected:
                 LOG.trace("Outcome of delivery was rejected: {}", delivery);
-                ErrorCondition remoteError = ((Rejected) outcome).getError();
+                ErrorCondition remoteError = ((Rejected) state).getError();
                 if (remoteError == null) {
                     remoteError = getEndpoint().getRemoteCondition();
                 }
 
                 deliveryError = AmqpSupport.convertToException(getParent().getProvider(), getEndpoint(), remoteError);
-            } else if (outcome instanceof Released) {
+                break;
+            case Released:
                 LOG.trace("Outcome of delivery was released: {}", delivery);
                 deliveryError = new JMSException("Delivery failed: released by receiver");
-            } else if (outcome instanceof Modified) {
+                break;
+            case Modified:
                 LOG.trace("Outcome of delivery was modified: {}", delivery);
                 deliveryError = new JMSException("Delivery failed: failure at remote");
-            }
-
-            if (deliveryError != null) {
-                send.onFailure(deliveryError);
-            }
+                break;
+            default:
+                LOG.warn("Message send updated with unsupported state: {}", state);
         }
 
-        super.processDeliveryUpdates(provider, delivery);
+        if (deliveryError != null) {
+            send.onFailure(deliveryError);
+        }
     }
 
     public AmqpSession getSession() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa424cab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
index fc68747..7a75d25 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodec.java
@@ -295,71 +295,36 @@ public final class AmqpCodec {
         Footer footer = null;
         Section section = null;
 
-        if (messageBytes.hasRemaining()) {
+        while (messageBytes.hasRemaining()) {
             section = (Section) decoder.readObject();
-        }
-
-        if (section instanceof Header) {
-            header = (Header) section;
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section instanceof DeliveryAnnotations) {
-            deliveryAnnotations = (DeliveryAnnotations) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section instanceof MessageAnnotations) {
-            messageAnnotations = (MessageAnnotations) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section instanceof Properties) {
-            properties = (Properties) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
 
-        }
-        if (section instanceof ApplicationProperties) {
-            applicationProperties = (ApplicationProperties) section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
-            }
-
-        }
-        if (section != null && !(section instanceof Footer)) {
-            body = section;
-
-            if (messageBytes.hasRemaining()) {
-                section = (Section) decoder.readObject();
-            } else {
-                section = null;
+            switch (section.getType()) {
+                case Header:
+                    header = (Header) section;
+                    break;
+                case DeliveryAnnotations:
+                    deliveryAnnotations = (DeliveryAnnotations) section;
+                    break;
+                case MessageAnnotations:
+                    messageAnnotations = (MessageAnnotations) section;
+                    break;
+                case Properties:
+                    properties = (Properties) section;
+                    break;
+                case ApplicationProperties:
+                    applicationProperties = (ApplicationProperties) section;
+                    break;
+                case Data:
+                case AmqpSequence:
+                case AmqpValue:
+                    body = section;
+                    break;
+                case Footer:
+                    footer = (Footer) section;
+                    break;
+                default:
+                    throw new IOException("Unknown Message Section forced decode abort.");
             }
-
-        }
-        if (section instanceof Footer) {
-            footer = (Footer) section;
         }
 
         decoder.setByteBuffer(null);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/aa424cab/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
index 2f52bff..1934532 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpWritableBuffer.java
@@ -106,6 +106,11 @@ public class AmqpWritableBuffer implements WritableBuffer {
     }
 
     @Override
+    public void ensureRemaining(int remianing) {
+        nettyBuffer.ensureWritable(remianing);
+    }
+
+    @Override
     public int position() {
         return nettyBuffer.writerIndex();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org