You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/12 16:23:59 UTC

[2/2] activemq-artemis git commit: ARTEMIS-2067 Clean up some code in the AMQP protocol handling paths

ARTEMIS-2067 Clean up some code in the AMQP protocol handling paths

Cleans up some of the code on the proton event handler, most noteable:

1. Fix IOCallback creation on each outbound send, use single instance
as the handler only ever does a flush and has no attached state.
2. Fix redundent locking and unlocking of connection lock on the event
path that already ensures that lock is held.
3. Set presettle state on the server sender at attach as it cannot
change afterwards so checking on every message is not needed.
4. Improve buffer type checking on receive to reduce amount of work


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d1939620
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d1939620
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d1939620

Branch: refs/heads/master
Commit: d1939620c03dab8f4a3ac5b7da159b8c27f078d6
Parents: a478eaf
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Sep 11 17:04:39 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Sep 12 12:23:49 2018 -0400

----------------------------------------------------------------------
 .../proton/ProtonServerReceiverContext.java     | 32 +++------
 .../amqp/proton/ProtonServerSenderContext.java  | 70 ++++++++------------
 .../amqp/proton/handler/ProtonHandler.java      | 38 ++++++-----
 .../transaction/ProtonTransactionHandler.java   | 57 ++++++----------
 .../proton/ProtonServerReceiverContextTest.java |  4 +-
 5 files changed, 82 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1939620/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 30dd10a..cdd1362 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -205,23 +205,20 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
    }
 
    /*
-   * called when Proton receives a message to be delivered via a Delivery.
-   *
-   * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
-   *
-   * */
+    * called when Proton receives a message to be delivered via a Delivery.
+    *
+    * This may be called more than once per deliver so we have to cache the buffer until we have received it all.
+    */
    @Override
    public void onMessage(Delivery delivery) throws ActiveMQAMQPException {
-      Receiver receiver;
       try {
+         Receiver receiver = ((Receiver) delivery.getLink());
 
-         if (!delivery.isReadable()) {
+         if (receiver.current() != delivery) {
             return;
          }
 
          if (delivery.isAborted()) {
-            receiver = ((Receiver) delivery.getLink());
-
             // Aborting implicitly remotely settles, so advance
             // receiver to the next delivery and settle locally.
             receiver.advance();
@@ -233,16 +230,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
             }
 
             return;
-         }
-
-         if (delivery.isPartial()) {
+         } else if (delivery.isPartial()) {
             return;
          }
 
-         receiver = ((Receiver) delivery.getLink());
-
          Transaction tx = null;
-
          ReadableBuffer data = receiver.recv();
          receiver.advance();
 
@@ -267,13 +259,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
          condition.setDescription(e.getMessage());
          rejected.setError(condition);
-         connection.lock();
-         try {
-            delivery.disposition(rejected);
-            delivery.settle();
-         } finally {
-            connection.unlock();
-         }
+
+         delivery.disposition(rejected);
+         delivery.settle();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1939620/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 0b40ee2..24dcff0 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -87,6 +87,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private static final Symbol SHARED = Symbol.valueOf("shared");
    private static final Symbol GLOBAL = Symbol.valueOf("global");
 
+   private final ConnectionFlushIOCallback connectionFlusher = new ConnectionFlushIOCallback();
+
    private Consumer brokerConsumer;
 
    protected final AMQPSessionContext protonSession;
@@ -101,6 +103,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
    private boolean shared = false;
    private boolean global = false;
    private boolean isVolatile = false;
+   private boolean preSettle;
    private SimpleString tempQueueName;
 
    public ProtonServerSenderContext(AMQPConnectionContext connection,
@@ -417,6 +420,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
       }
 
+      // Detect if sender is in pre-settle mode.
+      preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+
       // We need to update the source with any filters we support otherwise the client
       // is free to consider the attach as having failed if we don't send back what we
       // do support or if we send something we don't support the client won't know we
@@ -538,17 +544,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
       try {
          Message message = ((MessageReference) delivery.getContext()).getMessage();
-
-         boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-
-         DeliveryState remoteState;
-
-         connection.lock();
-         try {
-            remoteState = delivery.getRemoteState();
-         } finally {
-            connection.unlock();
-         }
+         DeliveryState remoteState = delivery.getRemoteState();
 
          boolean settleImmediate = true;
          if (remoteState instanceof Accepted) {
@@ -558,8 +554,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                return;
             }
             // we have to individual ack as we can't guarantee we will get the delivery updates
-            // (including acks) in order
-            // from dealer, a perf hit but a must
+            // (including acks) in order from dealer, a performance hit but a must
             try {
                sessionSPI.ack(null, brokerConsumer, message);
             } catch (Exception e) {
@@ -580,16 +575,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                      TransactionalState txAccepted = new TransactionalState();
                      txAccepted.setOutcome(Accepted.getInstance());
                      txAccepted.setTxnId(txState.getTxnId());
-                     connection.lock();
-                     try {
-                        delivery.disposition(txAccepted);
-                     } finally {
-                        connection.unlock();
-                     }
+                     delivery.disposition(txAccepted);
                   }
                   // we have to individual ack as we can't guarantee we will get the delivery
-                  // updates (including acks) in order
-                  // from dealer, a perf hit but a must
+                  // (including acks) in order from dealer, a performance hit but a must
                   try {
                      sessionSPI.ack(tx, brokerConsumer, message);
                      tx.addDelivery(delivery, this);
@@ -636,23 +625,24 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          }
 
          if (settleImmediate) {
-            settle(delivery);
+            delivery.settle();
          }
 
       } finally {
-         sessionSPI.afterIO(new IOCallback() {
-            @Override
-            public void done() {
-               connection.flush();
-            }
+         sessionSPI.afterIO(connectionFlusher);
+         sessionSPI.resetContext(oldContext);
+      }
+   }
 
-            @Override
-            public void onError(int errorCode, String errorMessage) {
-               connection.flush();
-            }
-         });
+   private final class ConnectionFlushIOCallback implements IOCallback {
+      @Override
+      public void done() {
+         connection.flush();
+      }
 
-         sessionSPI.resetContext(oldContext);
+      @Override
+      public void onError(int errorCode, String errorMessage) {
+         connection.flush();
       }
    }
 
@@ -681,16 +671,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       AMQPMessage message = CoreAmqpConverter.checkAMQP(messageReference.getMessage());
       sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) transportConnection.getProtocolConnection());
 
-      // presettle means we can settle the message on the dealer side before we send it, i.e.
-      // for browsers
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
-
       // we only need a tag if we are going to settle later
       byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
 
       // Let the Message decide how to present the message bytes
-      boolean attemptRelease = true;
       ReadableBuffer sendBuffer = message.getSendBuffer(deliveryCount);
+      boolean releaseRequired = sendBuffer instanceof NettyReadable;
 
       try {
          int size = sendBuffer.remaining();
@@ -713,14 +699,13 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             delivery.setMessageFormat((int) message.getMessageFormat());
             delivery.setContext(messageReference);
 
-            if (sendBuffer instanceof NettyReadable) {
+            if (releaseRequired) {
                sender.send(sendBuffer);
                // Above send copied, so release now if needed
-               attemptRelease = false;
+               releaseRequired = false;
                ((NettyReadable) sendBuffer).getByteBuf().release();
             } else {
                // Don't have pooled content, no need to release or copy.
-               attemptRelease = false;
                sender.sendNoCopy(sendBuffer);
             }
 
@@ -731,6 +716,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             } else {
                sender.advance();
             }
+
             connection.flush();
          } finally {
             connection.unlock();
@@ -738,7 +724,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
 
          return size;
       } finally {
-         if (attemptRelease && sendBuffer instanceof NettyReadable) {
+         if (releaseRequired) {
             ((NettyReadable) sendBuffer).getByteBuf().release();
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1939620/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 38ca7a7..694c1d3 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -243,24 +243,9 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
             int capacity = transport.capacity();
 
             if (!receivedFirstPacket) {
-               try {
-                  byte auth = buffer.getByte(4);
-                  if (auth == SASL || auth == BARE) {
-                     if (isServer) {
-                        dispatchAuth(auth == SASL);
-                     } else if (auth == BARE && clientSASLMechanism == null) {
-                        dispatchAuthSuccess();
-                     }
-                     /*
-                     * there is a chance that if SASL Handshake has been carried out that the capacity may change.
-                     * */
-                     capacity = transport.capacity();
-                  }
-               } catch (Throwable e) {
-                  log.warn(e.getMessage(), e);
-               }
-
-               receivedFirstPacket = true;
+               handleFirstPacket(buffer);
+               // there is a chance that if SASL Handshake has been carried out that the capacity may change.
+               capacity = transport.capacity();
             }
 
             if (capacity > 0) {
@@ -537,4 +522,21 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
       sasl.client();
       sasl.setListener(this);
    }
+
+   private void handleFirstPacket(ByteBuf buffer) {
+      try {
+         byte auth = buffer.getByte(4);
+         if (auth == SASL || auth == BARE) {
+            if (isServer) {
+               dispatchAuth(auth == SASL);
+            } else if (auth == BARE && clientSASLMechanism == null) {
+               dispatchAuthSuccess();
+            }
+         }
+      } catch (Throwable e) {
+         log.warn(e.getMessage(), e);
+      }
+
+      receivedFirstPacket = true;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1939620/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
index 28573e0..9ccc196 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionHandler.java
@@ -73,32 +73,27 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
          ByteBuffer buffer;
          MessageImpl msg;
 
-         connection.lock();
-         try {
-            // Replenish coordinator receiver credit on exhaustion so sender can continue
-            // transaction declare and discahrge operations.
-            if (receiver.getCredit() < amqpLowMark) {
-               receiver.flow(amqpCredit);
-            }
+         // Replenish coordinator receiver credit on exhaustion so sender can continue
+         // transaction declare and discahrge operations.
+         if (receiver.getCredit() < amqpLowMark) {
+            receiver.flow(amqpCredit);
+         }
 
-            // Declare is generally 7 bytes and discharge is around 48 depending on the
-            // encoded size of the TXN ID.  Decode buffer has a bit of extra space but if
-            // the incoming request is to big just use a scratch buffer.
-            if (delivery.available() > DECODE_BUFFER.capacity()) {
-               buffer = ByteBuffer.allocate(delivery.available());
-            } else {
-               buffer = (ByteBuffer) DECODE_BUFFER.clear();
-            }
+         // Declare is generally 7 bytes and discharge is around 48 depending on the
+         // encoded size of the TXN ID.  Decode buffer has a bit of extra space but if
+         // the incoming request is to big just use a scratch buffer.
+         if (delivery.available() > DECODE_BUFFER.capacity()) {
+            buffer = ByteBuffer.allocate(delivery.available());
+         } else {
+            buffer = (ByteBuffer) DECODE_BUFFER.clear();
+         }
 
-            // Update Buffer for the next incoming command.
-            buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));
+         // Update Buffer for the next incoming command.
+         buffer.limit(receiver.recv(buffer.array(), buffer.arrayOffset(), buffer.capacity()));
 
-            receiver.advance();
+         receiver.advance();
 
-            msg = decodeMessage(buffer);
-         } finally {
-            connection.unlock();
-         }
+         msg = decodeMessage(buffer);
 
          Object action = ((AmqpValue) msg.getBody()).getValue();
          if (action instanceof Declare) {
@@ -160,23 +155,13 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
          }
       } catch (ActiveMQAMQPException amqpE) {
          log.warn(amqpE.getMessage(), amqpE);
-         connection.lock();
-         try {
-            delivery.settle();
-            delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
-         } finally {
-            connection.unlock();
-         }
+         delivery.settle();
+         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
          connection.flush();
       } catch (Throwable e) {
          log.warn(e.getMessage(), e);
-         connection.lock();
-         try {
-            delivery.settle();
-            delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
-         } finally {
-            connection.unlock();
-         }
+         delivery.settle();
+         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
          connection.flush();
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d1939620/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index 88dfe3a..a157ef1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -49,17 +49,19 @@ public class ProtonServerReceiverContextTest {
       ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver);
 
       Delivery mockDelivery = mock(Delivery.class);
-      when(mockDelivery.isReadable()).thenReturn(true);
       when(mockDelivery.isAborted()).thenReturn(true);
       when(mockDelivery.isPartial()).thenReturn(true);
       when(mockDelivery.getLink()).thenReturn(mockReceiver);
 
+      when(mockReceiver.current()).thenReturn(mockDelivery);
+
       if (drain) {
          when(mockReceiver.getDrain()).thenReturn(true);
       }
 
       rc.onMessage(mockDelivery);
 
+      verify(mockReceiver, times(1)).current();
       verify(mockReceiver, times(1)).advance();
       verify(mockDelivery, times(1)).settle();