You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/24 20:29:20 UTC

[05/33] activemq-artemis git commit: ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
(Perf improvement)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f24d97bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f24d97bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f24d97bf

Branch: refs/heads/2.6.x
Commit: f24d97bfd11b44c4ac7e672a1ec089ea9db9422a
Parents: 47b31b5
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed May 16 11:33:24 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:02:40 2018 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  3 +-
 .../core/protocol/openwire/amq/AMQConsumer.java | 51 +++++++++++++++++++-
 .../core/protocol/openwire/amq/AMQSession.java  | 15 +-----
 3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f666785..21b2d46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1294,7 +1294,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                   referenceIterator.remove();
                   ref.incrementDeliveryCount();
                   consumer.backToDelivering(ref);
-                  session.addRolledback(ref.getMessageID());
+                  final AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
+                  amqConsumer.addRolledback(ref);
                }
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 0b7eff5..7e9881b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -17,8 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -68,6 +71,7 @@ public class AMQConsumer {
    //internal means we don't expose
    //it's address/queue to management service
    private boolean internalAddress = false;
+   private volatile Set<MessageReference> rolledbackMessageRefs;
 
    public AMQConsumer(AMQSession amqSession,
                       org.apache.activemq.command.ActiveMQDestination d,
@@ -85,6 +89,30 @@ public class AMQConsumer {
          messagePullHandler = new MessagePullHandler();
       }
       this.internalAddress = internalAddress;
+      this.rolledbackMessageRefs = null;
+   }
+
+   private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
+      synchronized (this) {
+         Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
+         if (rollbackedMessageRefs == null) {
+            rollbackedMessageRefs = new ConcurrentSkipListSet<>(Comparator.comparingLong(MessageReference::getMessageID));
+            this.rolledbackMessageRefs = rollbackedMessageRefs;
+         }
+         return rollbackedMessageRefs;
+      }
+   }
+
+   private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
+      Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
+      if (rolledbackMessageRefs == null) {
+         rolledbackMessageRefs = guardedInitializationOfRolledBackMessageRefs();
+      }
+      return rolledbackMessageRefs;
+   }
+
+   private Set<MessageReference> getRolledbackMessageRefs() {
+      return this.rolledbackMessageRefs;
    }
 
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
@@ -353,7 +381,6 @@ public class AMQConsumer {
    }
 
    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
-      long seqId = ref.getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
 
       //in activemq5, closing a durable subscription won't close the consumer
@@ -373,7 +400,7 @@ public class AMQConsumer {
          // tx cases are handled by
          // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
          ref.incrementDeliveryCount();
-      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) {
+      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
          ref.incrementDeliveryCount();
       }
 
@@ -432,4 +459,24 @@ public class AMQConsumer {
          }
       }
    }
+
+   public boolean removeRolledback(MessageReference messageReference) {
+      final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
+      if (rolledbackMessageRefs == null) {
+         return false;
+      }
+      return rolledbackMessageRefs.remove(messageReference);
+   }
+
+   public void addRolledback(MessageReference messageReference) {
+      getRolledbackMessageRefsOrCreate().add(messageReference);
+   }
+
+   private boolean isRolledBack(MessageReference messageReference) {
+      final Set<MessageReference> rollbackedMessageRefs = getRolledbackMessageRefs();
+      if (rollbackedMessageRefs == null) {
+         return false;
+      }
+      return rollbackedMessageRefs.contains(messageReference);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 34e2c0f..0250f1c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -20,7 +20,6 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -97,8 +95,6 @@ public class AMQSession implements SessionCallback {
 
    private final SimpleString clientId;
 
-   private final Set<Long> rollbackedIds = new ConcurrentHashSet<>();
-
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -313,8 +309,7 @@ public class AMQSession implements SessionCallback {
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
       //clear up possible rolledback ids.
-      rollbackedIds.remove(message.getMessageID());
-      // TODO: use encoders and proper conversions here
+      theConsumer.removeRolledback(reference);
       return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
    }
 
@@ -548,12 +543,4 @@ public class AMQSession implements SessionCallback {
    public boolean isInternal() {
       return sessInfo.getSessionId().getValue() == -1;
    }
-
-   public void addRolledback(long messageID) {
-      this.rollbackedIds.add(messageID);
-   }
-
-   public boolean isRolledBack(long mid) {
-      return rollbackedIds.remove(mid);
-   }
 }