You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/11/16 01:18:44 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1938 Update proton-j to 0.30.0 and Qpid JMS 0.37.0

ARTEMIS-1938 Update proton-j to 0.30.0 and Qpid JMS 0.37.0

Update to latest proton-j release and refactor the dispostion code to use
the new type enums to better deal with the dispistions.  Updates to Qpid JMS
0.37.0 which still uses the current netty 4.1.28.Final dependency.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/593348b9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/593348b9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/593348b9

Branch: refs/heads/master
Commit: 593348b9ada7e7bec4b62b2b65d537b2ac25dd29
Parents: 9263bb4
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Nov 14 16:31:46 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 15 20:18:37 2018 -0500

----------------------------------------------------------------------
 .../amqp/proton/ProtonServerSenderContext.java  | 83 ++++++++++++--------
 .../protocol/amqp/util/NettyWritable.java       |  5 ++
 pom.xml                                         |  4 +-
 3 files changed, 58 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 24dcff0..c4aca48 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -56,14 +56,13 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
+import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
@@ -546,22 +545,45 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          Message message = ((MessageReference) delivery.getContext()).getMessage();
          DeliveryState remoteState = delivery.getRemoteState();
 
-         boolean settleImmediate = true;
-         if (remoteState instanceof Accepted) {
+         if (remoteState != null && remoteState.getType() == DeliveryStateType.Accepted) {
             // this can happen in the twice ack mode, that is the receiver accepts and settles separately
             // acking again would show an exception but would have no negative effect but best to handle anyway.
-            if (delivery.isSettled()) {
-               return;
-            }
-            // we have to individual ack as we can't guarantee we will get the delivery updates
-            // (including acks) in order from dealer, a performance hit but a must
-            try {
-               sessionSPI.ack(null, brokerConsumer, message);
-            } catch (Exception e) {
-               log.warn(e.toString(), e);
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+            if (!delivery.isSettled()) {
+               // we have to individual ack as we can't guarantee we will get the delivery updates
+               // (including acks) in order from dealer, a performance hit but a must
+               try {
+                  sessionSPI.ack(null, brokerConsumer, message);
+               } catch (Exception e) {
+                  log.warn(e.toString(), e);
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+               }
+
+               delivery.settle();
             }
-         } else if (remoteState instanceof TransactionalState) {
+         } else {
+            handleExtendedDeliveryOutcomes(message, delivery, remoteState);
+         }
+
+         if (!preSettle) {
+            protonSession.replaceTag(delivery.getTag());
+         }
+      } finally {
+         sessionSPI.afterIO(connectionFlusher);
+         sessionSPI.resetContext(oldContext);
+      }
+   }
+
+   private boolean handleExtendedDeliveryOutcomes(Message message, Delivery delivery, DeliveryState remoteState) throws ActiveMQAMQPException {
+      boolean settleImmediate = true;
+      boolean handled = true;
+
+      if (remoteState == null) {
+         log.debug("Received null disposition for delivery update: " + remoteState);
+         return true;
+      }
+
+      switch (remoteState.getType()) {
+         case Transactional:
             // When the message arrives with a TransactionState disposition the ack should
             // enlist the message into the transaction associated with the given txn ID.
             TransactionalState txState = (TransactionalState) remoteState;
@@ -587,19 +609,22 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                   }
                }
             }
-         } else if (remoteState instanceof Released) {
+            break;
+         case Released:
             try {
                sessionSPI.cancel(brokerConsumer, message, false);
             } catch (Exception e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
             }
-         } else if (remoteState instanceof Rejected) {
+            break;
+         case Rejected:
             try {
                sessionSPI.reject(brokerConsumer, message);
             } catch (Exception e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
             }
-         } else if (remoteState instanceof Modified) {
+            break;
+         case Modified:
             try {
                Modified modification = (Modified) remoteState;
 
@@ -615,23 +640,17 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             } catch (Exception e) {
                throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
             }
-         } else {
+            break;
+         default:
             log.debug("Received null or unknown disposition for delivery update: " + remoteState);
-            return;
-         }
-
-         if (!preSettle) {
-            protonSession.replaceTag(delivery.getTag());
-         }
-
-         if (settleImmediate) {
-            delivery.settle();
-         }
+            handled = false;
+      }
 
-      } finally {
-         sessionSPI.afterIO(connectionFlusher);
-         sessionSPI.resetContext(oldContext);
+      if (settleImmediate) {
+         delivery.settle();
       }
+
+      return handled;
    }
 
    private final class ConnectionFlushIOCallback implements IOCallback {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
index d752bd7..6f363db 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyWritable.java
@@ -86,6 +86,11 @@ public class NettyWritable implements WritableBuffer {
    }
 
    @Override
+   public void ensureRemaining(int remaining) {
+      nettyBuffer.ensureWritable(remaining);
+   }
+
+   @Override
    public int position() {
       return nettyBuffer.writerIndex();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/593348b9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 87acfca..f435121 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,10 +92,10 @@
       <mockito.version>2.8.47</mockito.version>
       <netty.version>4.1.28.Final</netty.version>
       <netty-tcnative-version>2.0.12.Final</netty-tcnative-version>
-      <proton.version>0.29.0</proton.version>
+      <proton.version>0.30.0</proton.version>
       <resteasy.version>3.0.19.Final</resteasy.version>
       <slf4j.version>1.7.21</slf4j.version>
-      <qpid.jms.version>0.36.0</qpid.jms.version>
+      <qpid.jms.version>0.37.0</qpid.jms.version>
       <johnzon.version>0.9.5</johnzon.version>
       <json-p.spec.version>1.0-alpha-1</json-p.spec.version>
       <javax.inject.version>1</javax.inject.version>