You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/21 22:07:45 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9eed307ce -> ef03ce4ee


ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47b31b53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47b31b53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47b31b53

Branch: refs/heads/master
Commit: 47b31b53d608f762b9c38e924c5a9f8b92f384b5
Parents: 9eed307
Author: Howard Gao <ho...@gmail.com>
Authored: Wed May 16 11:14:48 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:00:08 2018 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  1 +
 .../core/protocol/openwire/amq/AMQConsumer.java |  4 +-
 .../core/protocol/openwire/amq/AMQSession.java  | 14 ++++++
 .../openwire/amq/RedeliveryPolicyTest.java      | 52 ++++++++++++++++++++
 4 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6a10de7..f666785 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1294,6 +1294,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                   referenceIterator.remove();
                   ref.incrementDeliveryCount();
                   consumer.backToDelivering(ref);
+                  session.addRolledback(ref.getMessageID());
                }
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index e0b02ae..0b7eff5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -353,7 +353,7 @@ public class AMQConsumer {
    }
 
    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
-      long seqId = ref.getMessage().getMessageID();
+      long seqId = ref.getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
 
       //in activemq5, closing a durable subscription won't close the consumer
@@ -373,6 +373,8 @@ public class AMQConsumer {
          // tx cases are handled by
          // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
          ref.incrementDeliveryCount();
+      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) {
+         ref.incrementDeliveryCount();
       }
 
       return true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index c3b1a20..34e2c0f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -20,6 +20,7 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -52,6 +53,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -95,6 +97,8 @@ public class AMQSession implements SessionCallback {
 
    private final SimpleString clientId;
 
+   private final Set<Long> rollbackedIds = new ConcurrentHashSet<>();
+
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -308,6 +312,8 @@ public class AMQSession implements SessionCallback {
                           ServerConsumer consumer,
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
+      //clear up possible rolledback ids.
+      rollbackedIds.remove(message.getMessageID());
       // TODO: use encoders and proper conversions here
       return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
    }
@@ -542,4 +548,12 @@ public class AMQSession implements SessionCallback {
    public boolean isInternal() {
       return sessInfo.getSessionId().getValue() == -1;
    }
+
+   public void addRolledback(long messageID) {
+      this.rollbackedIds.add(messageID);
+   }
+
+   public boolean isRolledBack(long mid) {
+      return rollbackedIds.remove(mid);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 7ee0eb9..3e50cc7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -21,9 +21,11 @@ import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
 import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
@@ -633,4 +635,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
       session.commit();
    }
 
+   @Test
+   public void testClientRedlivery() throws Exception {
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         this.makeSureCoreQueueExist("TEST");
+
+         Queue queue = session.createQueue("TEST");
+
+         MessageProducer producer = session.createProducer(queue);
+
+         producer.send(session.createTextMessage("test"));
+
+      } finally {
+         connection.close();
+      }
+
+      for (int i = 0; i < 10; ++i) {
+
+         connection = (ActiveMQConnection) factory.createConnection();
+
+         connection.start();
+
+         try {
+
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            Queue queue = session.createQueue("TEST");
+
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            Message message = consumer.receive(1000);
+
+            assertNotNull("Message null on iteration " + i, message);
+
+            System.out.println("received message: " + i);
+            System.out.println("is redelivered: " + message.getJMSRedelivered());
+            if (i > 0) {
+               assertTrue(message.getJMSRedelivered());
+            }
+
+         } finally {
+            connection.close();
+         }
+      }
+
+   }
+
 }


[3/3] activemq-artemis git commit: This closes #2090

Posted by cl...@apache.org.
This closes #2090


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ef03ce4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ef03ce4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ef03ce4e

Branch: refs/heads/master
Commit: ef03ce4ee9704a611370fc13096358af8504b6dd
Parents: 9eed307 f24d97b
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon May 21 18:07:54 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:07:54 2018 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  2 +
 .../core/protocol/openwire/amq/AMQConsumer.java | 51 ++++++++++++++++++-
 .../core/protocol/openwire/amq/AMQSession.java  |  3 +-
 .../openwire/amq/RedeliveryPolicyTest.java      | 52 ++++++++++++++++++++
 4 files changed, 106 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

Posted by cl...@apache.org.
ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode

If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
(Perf improvement)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f24d97bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f24d97bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f24d97bf

Branch: refs/heads/master
Commit: f24d97bfd11b44c4ac7e672a1ec089ea9db9422a
Parents: 47b31b5
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed May 16 11:33:24 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:02:40 2018 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   |  3 +-
 .../core/protocol/openwire/amq/AMQConsumer.java | 51 +++++++++++++++++++-
 .../core/protocol/openwire/amq/AMQSession.java  | 15 +-----
 3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f666785..21b2d46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1294,7 +1294,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
                   referenceIterator.remove();
                   ref.incrementDeliveryCount();
                   consumer.backToDelivering(ref);
-                  session.addRolledback(ref.getMessageID());
+                  final AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
+                  amqConsumer.addRolledback(ref);
                }
             }
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 0b7eff5..7e9881b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -17,8 +17,11 @@
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -68,6 +71,7 @@ public class AMQConsumer {
    //internal means we don't expose
    //it's address/queue to management service
    private boolean internalAddress = false;
+   private volatile Set<MessageReference> rolledbackMessageRefs;
 
    public AMQConsumer(AMQSession amqSession,
                       org.apache.activemq.command.ActiveMQDestination d,
@@ -85,6 +89,30 @@ public class AMQConsumer {
          messagePullHandler = new MessagePullHandler();
       }
       this.internalAddress = internalAddress;
+      this.rolledbackMessageRefs = null;
+   }
+
+   private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
+      synchronized (this) {
+         Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
+         if (rollbackedMessageRefs == null) {
+            rollbackedMessageRefs = new ConcurrentSkipListSet<>(Comparator.comparingLong(MessageReference::getMessageID));
+            this.rolledbackMessageRefs = rollbackedMessageRefs;
+         }
+         return rollbackedMessageRefs;
+      }
+   }
+
+   private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
+      Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
+      if (rolledbackMessageRefs == null) {
+         rolledbackMessageRefs = guardedInitializationOfRolledBackMessageRefs();
+      }
+      return rolledbackMessageRefs;
+   }
+
+   private Set<MessageReference> getRolledbackMessageRefs() {
+      return this.rolledbackMessageRefs;
    }
 
    public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
@@ -353,7 +381,6 @@ public class AMQConsumer {
    }
 
    public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
-      long seqId = ref.getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
 
       //in activemq5, closing a durable subscription won't close the consumer
@@ -373,7 +400,7 @@ public class AMQConsumer {
          // tx cases are handled by
          // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
          ref.incrementDeliveryCount();
-      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) {
+      } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
          ref.incrementDeliveryCount();
       }
 
@@ -432,4 +459,24 @@ public class AMQConsumer {
          }
       }
    }
+
+   public boolean removeRolledback(MessageReference messageReference) {
+      final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
+      if (rolledbackMessageRefs == null) {
+         return false;
+      }
+      return rolledbackMessageRefs.remove(messageReference);
+   }
+
+   public void addRolledback(MessageReference messageReference) {
+      getRolledbackMessageRefsOrCreate().add(messageReference);
+   }
+
+   private boolean isRolledBack(MessageReference messageReference) {
+      final Set<MessageReference> rollbackedMessageRefs = getRolledbackMessageRefs();
+      if (rollbackedMessageRefs == null) {
+         return false;
+      }
+      return rollbackedMessageRefs.contains(messageReference);
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 34e2c0f..0250f1c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -20,7 +20,6 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -97,8 +95,6 @@ public class AMQSession implements SessionCallback {
 
    private final SimpleString clientId;
 
-   private final Set<Long> rollbackedIds = new ConcurrentHashSet<>();
-
    public AMQSession(ConnectionInfo connInfo,
                      SessionInfo sessInfo,
                      ActiveMQServer server,
@@ -313,8 +309,7 @@ public class AMQSession implements SessionCallback {
                           int deliveryCount) {
       AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
       //clear up possible rolledback ids.
-      rollbackedIds.remove(message.getMessageID());
-      // TODO: use encoders and proper conversions here
+      theConsumer.removeRolledback(reference);
       return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
    }
 
@@ -548,12 +543,4 @@ public class AMQSession implements SessionCallback {
    public boolean isInternal() {
       return sessInfo.getSessionId().getValue() == -1;
    }
-
-   public void addRolledback(long messageID) {
-      this.rollbackedIds.add(messageID);
-   }
-
-   public boolean isRolledBack(long mid) {
-      return rollbackedIds.remove(mid);
-   }
 }