You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/05/21 22:07:45 UTC
[1/3] activemq-artemis git commit: ARTEMIS-1868 Openwire doesn't add
delivery count in client ack mode
Repository: activemq-artemis
Updated Branches:
refs/heads/master 9eed307ce -> ef03ce4ee
ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47b31b53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47b31b53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47b31b53
Branch: refs/heads/master
Commit: 47b31b53d608f762b9c38e924c5a9f8b92f384b5
Parents: 9eed307
Author: Howard Gao <ho...@gmail.com>
Authored: Wed May 16 11:14:48 2018 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:00:08 2018 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 1 +
.../core/protocol/openwire/amq/AMQConsumer.java | 4 +-
.../core/protocol/openwire/amq/AMQSession.java | 14 ++++++
.../openwire/amq/RedeliveryPolicyTest.java | 52 ++++++++++++++++++++
4 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 6a10de7..f666785 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1294,6 +1294,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
referenceIterator.remove();
ref.incrementDeliveryCount();
consumer.backToDelivering(ref);
+ session.addRolledback(ref.getMessageID());
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index e0b02ae..0b7eff5 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -353,7 +353,7 @@ public class AMQConsumer {
}
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
- long seqId = ref.getMessage().getMessageID();
+ long seqId = ref.getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
//in activemq5, closing a durable subscription won't close the consumer
@@ -373,6 +373,8 @@ public class AMQConsumer {
// tx cases are handled by
// org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
ref.incrementDeliveryCount();
+ } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) {
+ ref.incrementDeliveryCount();
}
return true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index c3b1a20..34e2c0f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -20,6 +20,7 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -52,6 +53,7 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@@ -95,6 +97,8 @@ public class AMQSession implements SessionCallback {
private final SimpleString clientId;
+ private final Set<Long> rollbackedIds = new ConcurrentHashSet<>();
+
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -308,6 +312,8 @@ public class AMQSession implements SessionCallback {
ServerConsumer consumer,
int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
+ //clear up possible rolledback ids.
+ rollbackedIds.remove(message.getMessageID());
// TODO: use encoders and proper conversions here
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
}
@@ -542,4 +548,12 @@ public class AMQSession implements SessionCallback {
public boolean isInternal() {
return sessInfo.getSessionId().getValue() == -1;
}
+
+ public void addRolledback(long messageID) {
+ this.rollbackedIds.add(messageID);
+ }
+
+ public boolean isRolledBack(long mid) {
+ return rollbackedIds.remove(mid);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47b31b53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index 7ee0eb9..3e50cc7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -21,9 +21,11 @@ import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
@@ -633,4 +635,54 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest {
session.commit();
}
+ @Test
+ public void testClientRedlivery() throws Exception {
+
+ try {
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ this.makeSureCoreQueueExist("TEST");
+
+ Queue queue = session.createQueue("TEST");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ producer.send(session.createTextMessage("test"));
+
+ } finally {
+ connection.close();
+ }
+
+ for (int i = 0; i < 10; ++i) {
+
+ connection = (ActiveMQConnection) factory.createConnection();
+
+ connection.start();
+
+ try {
+
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue("TEST");
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(1000);
+
+ assertNotNull("Message null on iteration " + i, message);
+
+ System.out.println("received message: " + i);
+ System.out.println("is redelivered: " + message.getJMSRedelivered());
+ if (i > 0) {
+ assertTrue(message.getJMSRedelivered());
+ }
+
+ } finally {
+ connection.close();
+ }
+ }
+
+ }
+
}
[3/3] activemq-artemis git commit: This closes #2090
Posted by cl...@apache.org.
This closes #2090
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ef03ce4e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ef03ce4e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ef03ce4e
Branch: refs/heads/master
Commit: ef03ce4ee9704a611370fc13096358af8504b6dd
Parents: 9eed307 f24d97b
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon May 21 18:07:54 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:07:54 2018 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 2 +
.../core/protocol/openwire/amq/AMQConsumer.java | 51 ++++++++++++++++++-
.../core/protocol/openwire/amq/AMQSession.java | 3 +-
.../openwire/amq/RedeliveryPolicyTest.java | 52 ++++++++++++++++++++
4 files changed, 106 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-1868 Openwire doesn't add
delivery count in client ack mode
Posted by cl...@apache.org.
ARTEMIS-1868 Openwire doesn't add delivery count in client ack mode
If a client ack mode consumer receives a message and closes without
acking it, the redelivery of the message won't set the redelivery
flag (JMSRedelivered) because it doesn't increment the delivery count
when message is cancelled back to queue.
(Perf improvement)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f24d97bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f24d97bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f24d97bf
Branch: refs/heads/master
Commit: f24d97bfd11b44c4ac7e672a1ec089ea9db9422a
Parents: 47b31b5
Author: Francesco Nigro <ni...@gmail.com>
Authored: Wed May 16 11:33:24 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon May 21 18:02:40 2018 -0400
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 3 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 51 +++++++++++++++++++-
.../core/protocol/openwire/amq/AMQSession.java | 15 +-----
3 files changed, 52 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index f666785..21b2d46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1294,7 +1294,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
referenceIterator.remove();
ref.incrementDeliveryCount();
consumer.backToDelivering(ref);
- session.addRolledback(ref.getMessageID());
+ final AMQConsumer amqConsumer = (AMQConsumer) consumer.getProtocolData();
+ amqConsumer.addRolledback(ref);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 0b7eff5..7e9881b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -17,8 +17,11 @@
package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.io.IOException;
+import java.util.Comparator;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -68,6 +71,7 @@ public class AMQConsumer {
//internal means we don't expose
//it's address/queue to management service
private boolean internalAddress = false;
+ private volatile Set<MessageReference> rolledbackMessageRefs;
public AMQConsumer(AMQSession amqSession,
org.apache.activemq.command.ActiveMQDestination d,
@@ -85,6 +89,30 @@ public class AMQConsumer {
messagePullHandler = new MessagePullHandler();
}
this.internalAddress = internalAddress;
+ this.rolledbackMessageRefs = null;
+ }
+
+ private Set<MessageReference> guardedInitializationOfRolledBackMessageRefs() {
+ synchronized (this) {
+ Set<MessageReference> rollbackedMessageRefs = this.rolledbackMessageRefs;
+ if (rollbackedMessageRefs == null) {
+ rollbackedMessageRefs = new ConcurrentSkipListSet<>(Comparator.comparingLong(MessageReference::getMessageID));
+ this.rolledbackMessageRefs = rollbackedMessageRefs;
+ }
+ return rollbackedMessageRefs;
+ }
+ }
+
+ private Set<MessageReference> getRolledbackMessageRefsOrCreate() {
+ Set<MessageReference> rolledbackMessageRefs = this.rolledbackMessageRefs;
+ if (rolledbackMessageRefs == null) {
+ rolledbackMessageRefs = guardedInitializationOfRolledBackMessageRefs();
+ }
+ return rolledbackMessageRefs;
+ }
+
+ private Set<MessageReference> getRolledbackMessageRefs() {
+ return this.rolledbackMessageRefs;
}
public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, long nativeId) throws Exception {
@@ -353,7 +381,6 @@ public class AMQConsumer {
}
public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
- long seqId = ref.getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
//in activemq5, closing a durable subscription won't close the consumer
@@ -373,7 +400,7 @@ public class AMQConsumer {
// tx cases are handled by
// org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
ref.incrementDeliveryCount();
- } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !session.isRolledBack(seqId)) {
+ } else if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNSET && !isRolledBack(ref)) {
ref.incrementDeliveryCount();
}
@@ -432,4 +459,24 @@ public class AMQConsumer {
}
}
}
+
+ public boolean removeRolledback(MessageReference messageReference) {
+ final Set<MessageReference> rolledbackMessageRefs = getRolledbackMessageRefs();
+ if (rolledbackMessageRefs == null) {
+ return false;
+ }
+ return rolledbackMessageRefs.remove(messageReference);
+ }
+
+ public void addRolledback(MessageReference messageReference) {
+ getRolledbackMessageRefsOrCreate().add(messageReference);
+ }
+
+ private boolean isRolledBack(MessageReference messageReference) {
+ final Set<MessageReference> rollbackedMessageRefs = getRolledbackMessageRefs();
+ if (rollbackedMessageRefs == null) {
+ return false;
+ }
+ return rollbackedMessageRefs.contains(messageReference);
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/f24d97bf/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 34e2c0f..0250f1c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -20,7 +20,6 @@ import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUt
import java.io.IOException;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,7 +52,6 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
-import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@@ -97,8 +95,6 @@ public class AMQSession implements SessionCallback {
private final SimpleString clientId;
- private final Set<Long> rollbackedIds = new ConcurrentHashSet<>();
-
public AMQSession(ConnectionInfo connInfo,
SessionInfo sessInfo,
ActiveMQServer server,
@@ -313,8 +309,7 @@ public class AMQSession implements SessionCallback {
int deliveryCount) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
//clear up possible rolledback ids.
- rollbackedIds.remove(message.getMessageID());
- // TODO: use encoders and proper conversions here
+ theConsumer.removeRolledback(reference);
return theConsumer.handleDeliver(reference, message.toCore(), deliveryCount);
}
@@ -548,12 +543,4 @@ public class AMQSession implements SessionCallback {
public boolean isInternal() {
return sessInfo.getSessionId().getValue() == -1;
}
-
- public void addRolledback(long messageID) {
- this.rollbackedIds.add(messageID);
- }
-
- public boolean isRolledBack(long mid) {
- return rollbackedIds.remove(mid);
- }
}