You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/22 14:55:17 UTC

[1/3] activemq-artemis git commit: This closes #1114

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 10c9d797d -> d963a4a75


This closes #1114


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d963a4a7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d963a4a7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d963a4a7

Branch: refs/heads/master
Commit: d963a4a7581cca1f08c0d010dcd0bdc12b04bb60
Parents: 10c9d79 0bfb39b
Author: Justin Bertram <jb...@apache.org>
Authored: Wed Mar 22 09:55:02 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500

----------------------------------------------------------------------
 .../impl/netty/ActiveMQChannelHandler.java      |   6 +-
 .../netty/PartialPooledByteBufAllocator.java    |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  11 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 159 ++++++++++---------
 .../amqp/proton/handler/ProtonHandler.java      |   2 +-
 .../hornetq/HornetQProtocolManager.java         |   2 +-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 .../core/server/impl/ServerConsumerImpl.java    |  12 +-
 8 files changed, 110 insertions(+), 90 deletions(-)
----------------------------------------------------------------------



[3/3] activemq-artemis git commit: ARTEMIS-1051 Adding synchronized calls on ACKs

Posted by jb...@apache.org.
ARTEMIS-1051 Adding synchronized calls on ACKs


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ac7cafb2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ac7cafb2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ac7cafb2

Branch: refs/heads/master
Commit: ac7cafb210c2da126fe2f22ec54fcfe5de648d18
Parents: 10c9d79
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 21 10:33:36 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/ServerConsumerImpl.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ac7cafb2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 710a22b..e812f2e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -731,7 +731,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
     * This will be useful for other protocols that will need this such as openWire or MQTT.
     */
    @Override
-   public List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove,
+   public synchronized List<MessageReference> getDeliveringReferencesBasedOnProtocol(boolean remove,
                                                                         Object protocolDataStart,
                                                                         Object protocolDataEnd) {
       LinkedList<MessageReference> retReferences = new LinkedList<>();
@@ -766,7 +766,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public void acknowledge(Transaction tx, final long messageID) throws Exception {
+   public synchronized void acknowledge(Transaction tx, final long messageID) throws Exception {
       if (browseOnly) {
          return;
       }
@@ -830,7 +830,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public void individualAcknowledge(Transaction tx, final long messageID) throws Exception {
+   public synchronized void individualAcknowledge(Transaction tx, final long messageID) throws Exception {
       if (browseOnly) {
          return;
       }
@@ -892,7 +892,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public void individualCancel(final long messageID, boolean failed) throws Exception {
+   public synchronized void individualCancel(final long messageID, boolean failed) throws Exception {
       if (browseOnly) {
          return;
       }
@@ -911,12 +911,12 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
    }
 
    @Override
-   public void backToDelivering(MessageReference reference) {
+   public synchronized void backToDelivering(MessageReference reference) {
       deliveringRefs.addFirst(reference);
    }
 
    @Override
-   public MessageReference removeReferenceByID(final long messageID) throws Exception {
+   public synchronized MessageReference removeReferenceByID(final long messageID) throws Exception {
       if (browseOnly) {
          return null;
       }


[2/3] activemq-artemis git commit: ARTEMIS-1056 Performance improvements on AMQP

Posted by jb...@apache.org.
ARTEMIS-1056 Performance improvements on AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0bfb39bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0bfb39bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0bfb39bf

Branch: refs/heads/master
Commit: 0bfb39bfb577200499f6ee2a80a00b65c2ef8d02
Parents: ac7cafb
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Mar 21 11:43:06 2017 -0400
Committer: Justin Bertram <jb...@apache.org>
Committed: Wed Mar 22 09:55:02 2017 -0500

----------------------------------------------------------------------
 .../impl/netty/ActiveMQChannelHandler.java      |   6 +-
 .../netty/PartialPooledByteBufAllocator.java    |   6 +-
 .../protocol/amqp/broker/AMQPMessage.java       |  11 +-
 .../amqp/proton/ProtonServerSenderContext.java  | 159 ++++++++++---------
 .../amqp/proton/handler/ProtonHandler.java      |   2 +-
 .../hornetq/HornetQProtocolManager.java         |   2 +-
 .../protocol/core/impl/CoreProtocolManager.java |   2 +-
 7 files changed, 104 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
index 93be281..cc4407c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java
@@ -65,7 +65,11 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
       ByteBuf buffer = (ByteBuf) msg;
 
-      handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+      try {
+         handler.bufferReceived(channelId(ctx.channel()), new ChannelBufferWrapper(buffer));
+      } finally {
+         buffer.release();
+      }
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
index 5e67952..3a192e8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/PartialPooledByteBufAllocator.java
@@ -53,17 +53,17 @@ public class PartialPooledByteBufAllocator implements ByteBufAllocator {
 
    @Override
    public ByteBuf ioBuffer() {
-      return UNPOOLED.heapBuffer();
+      return POOLED.directBuffer();
    }
 
    @Override
    public ByteBuf ioBuffer(int initialCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity);
+      return POOLED.directBuffer(initialCapacity);
    }
 
    @Override
    public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
-      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+      return POOLED.directBuffer(initialCapacity, maxCapacity);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 7de9577..d02eace 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -63,7 +63,7 @@ public class AMQPMessage extends RefCountMessage {
    final long messageFormat;
    ByteBuf data;
    boolean bufferValid;
-   boolean durable;
+   Boolean durable;
    long messageID;
    String address;
    MessageImpl protonMessage;
@@ -491,10 +491,15 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public boolean isDurable() {
+      if (durable != null) {
+         return durable;
+      }
+
       if (getHeader() != null && getHeader().getDurable() != null) {
-         return getHeader().getDurable().booleanValue();
-      } else {
+         durable =  getHeader().getDurable().booleanValue();
          return durable;
+      } else {
+         return durable != null ? durable : false;
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index d24464c..780ca4d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -481,89 +481,100 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          return;
       }
 
-      Message message = ((MessageReference) delivery.getContext()).getMessage();
+      try {
+         Message message = ((MessageReference) delivery.getContext()).getMessage();
 
-      boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+         boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
 
-      DeliveryState remoteState = delivery.getRemoteState();
-
-      boolean settleImmediate = true;
-      if (remoteState != null) {
-         // If we are transactional then we need ack if the msg has been accepted
-         if (remoteState instanceof TransactionalState) {
-
-            TransactionalState txState = (TransactionalState) remoteState;
-            ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
-
-            if (txState.getOutcome() != null) {
-               settleImmediate = false;
-               Outcome outcome = txState.getOutcome();
-               if (outcome instanceof Accepted) {
-                  if (!delivery.remotelySettled()) {
-                     TransactionalState txAccepted = new TransactionalState();
-                     txAccepted.setOutcome(Accepted.getInstance());
-                     txAccepted.setTxnId(txState.getTxnId());
-                     delivery.disposition(txAccepted);
-                  }
-                  // we have to individual ack as we can't guarantee we will get the delivery
-                  // updates (including acks) in order
-                  // from dealer, a perf hit but a must
-                  try {
-                     sessionSPI.ack(tx, brokerConsumer, message);
-                     tx.addDelivery(delivery, this);
-                  } catch (Exception e) {
-                     throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+         DeliveryState remoteState;
+
+         synchronized (connection.getLock()) {
+            remoteState = delivery.getRemoteState();
+         }
+
+         boolean settleImmediate = true;
+         if (remoteState != null) {
+            // If we are transactional then we need ack if the msg has been accepted
+            if (remoteState instanceof TransactionalState) {
+
+               TransactionalState txState = (TransactionalState) remoteState;
+               ProtonTransactionImpl tx = (ProtonTransactionImpl) this.sessionSPI.getTransaction(txState.getTxnId(), false);
+
+               if (txState.getOutcome() != null) {
+                  settleImmediate = false;
+                  Outcome outcome = txState.getOutcome();
+                  if (outcome instanceof Accepted) {
+                     if (!delivery.remotelySettled()) {
+                        TransactionalState txAccepted = new TransactionalState();
+                        txAccepted.setOutcome(Accepted.getInstance());
+                        txAccepted.setTxnId(txState.getTxnId());
+                        synchronized (connection.getLock()) {
+                           delivery.disposition(txAccepted);
+                        }
+                     }
+                     // we have to individual ack as we can't guarantee we will get the delivery
+                     // updates (including acks) in order
+                     // from dealer, a perf hit but a must
+                     try {
+                        sessionSPI.ack(tx, brokerConsumer, message);
+                        tx.addDelivery(delivery, this);
+                     } catch (Exception e) {
+                        throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+                     }
                   }
                }
-            }
-         } else if (remoteState instanceof Accepted) {
-            //this can happen in the twice ack mode, that is the receiver accepts and settles separately
-            //acking again would show an exception but would have no negative effect but best to handle anyway.
-            if (delivery.isSettled()) {
-               return;
-            }
-            // we have to individual ack as we can't guarantee we will get the delivery updates
-            // (including acks) in order
-            // from dealer, a perf hit but a must
-            try {
-               sessionSPI.ack(null, brokerConsumer, message);
-            } catch (Exception e) {
-               log.warn(e.toString(), e);
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
-            }
-         } else if (remoteState instanceof Released) {
-            try {
-               sessionSPI.cancel(brokerConsumer, message, false);
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
-            }
-         } else if (remoteState instanceof Rejected) {
-            try {
-               sessionSPI.cancel(brokerConsumer, message, true);
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
-            }
-         } else if (remoteState instanceof Modified) {
-            try {
-               Modified modification = (Modified) remoteState;
-               if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
-                  sessionSPI.cancel(brokerConsumer, message, true);
-               } else {
+            } else if (remoteState instanceof Accepted) {
+               //this can happen in the twice ack mode, that is the receiver accepts and settles separately
+               //acking again would show an exception but would have no negative effect but best to handle anyway.
+               if (delivery.isSettled()) {
+                  return;
+               }
+               // we have to individual ack as we can't guarantee we will get the delivery updates
+               // (including acks) in order
+               // from dealer, a perf hit but a must
+               try {
+                  sessionSPI.ack(null, brokerConsumer, message);
+               } catch (Exception e) {
+                  log.warn(e.toString(), e);
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage());
+               }
+            } else if (remoteState instanceof Released) {
+               try {
                   sessionSPI.cancel(brokerConsumer, message, false);
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+               }
+            } else if (remoteState instanceof Rejected) {
+               try {
+                  sessionSPI.cancel(brokerConsumer, message, true);
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
+               }
+            } else if (remoteState instanceof Modified) {
+               try {
+                  Modified modification = (Modified) remoteState;
+                  if (Boolean.TRUE.equals(modification.getDeliveryFailed())) {
+                     sessionSPI.cancel(brokerConsumer, message, true);
+                  } else {
+                     sessionSPI.cancel(brokerConsumer, message, false);
+                  }
+               } catch (Exception e) {
+                  throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
                }
-            } catch (Exception e) {
-               throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage());
             }
-         }
-         // todo add tag caching
-         if (!preSettle) {
-            protonSession.replaceTag(delivery.getTag());
-         }
+            // todo add tag caching
+            if (!preSettle) {
+               protonSession.replaceTag(delivery.getTag());
+            }
 
-         if (settleImmediate) settle(delivery);
+            if (settleImmediate)
+               settle(delivery);
 
-      } else {
-         // todo not sure if we need to do anything here
+         } else {
+            // todo not sure if we need to do anything here
+         }
+      } finally {
+         connection.flush();
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
index 673a688..6b66f62 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java
@@ -240,7 +240,7 @@ public class ProtonHandler extends ProtonInitializable {
          }
 
          // For returning PooledBytes
-         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size);
+         ByteBuf buffer = PooledByteBufAllocator.DEFAULT.directBuffer(size);
          ByteBuffer head = transport.head();
          head.position(offset);
          head.limit(offset + size);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
index 2f6ed2f..de7d2ff 100644
--- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
+++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java
@@ -49,7 +49,7 @@ class HornetQProtocolManager extends CoreProtocolManager {
          buffer.getByte(5) == 'T' &&
          buffer.getByte(6) == 'Q') {
          //todo add some handshaking
-         buffer.readBytes(7);
+         buffer.skipBytes(7);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0bfb39bf/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index cc81fbe..2cfd451 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -174,7 +174,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
       //if we are not an old client then handshake
       if (isArtemis(buffer)) {
-         buffer.readBytes(7);
+         buffer.skipBytes(7);
       }
    }