You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/01 17:38:46 UTC
[50/52] [abbrv] activemq-artemis git commit: Fixed some redelivery
tests
Fixed some redelivery tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/552be8c5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/552be8c5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/552be8c5
Branch: refs/heads/refactor-openwire
Commit: 552be8c5ffe50eaac5030174bd438bc9b7b30ee2
Parents: 4a4b682
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Feb 26 22:24:03 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 1 11:37:32 2016 -0500
----------------------------------------------------------------------
.../openwire/amq/AMQServerConsumer.java | 22 ++++++++++++++++++++
.../protocol/openwire/amq/AMQServerSession.java | 7 +++++++
.../core/server/impl/ServerConsumerImpl.java | 16 ++++++++------
.../activemq/test/JmsTopicSendReceiveTest.java | 2 ++
4 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index 3f94351..34789b0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -35,6 +35,7 @@ public class AMQServerConsumer extends ServerConsumerImpl {
// TODO-NOW: remove this once unified
AMQConsumer amqConsumer;
+ boolean isClosing;
public AMQConsumer getAmqConsumer() {
return amqConsumer;
@@ -67,6 +68,18 @@ public class AMQServerConsumer extends ServerConsumerImpl {
this.browserDeliverer = newBrowserDeliverer;
}
+ public void closing() {
+ isClosing = true;
+ }
+
+ @Override
+ public HandleStatus handle(final MessageReference ref) throws Exception {
+ if (isClosing) {
+ return HandleStatus.BUSY;
+ }
+ return super.handle(ref);
+ }
+
private class AMQBrowserDeliverer extends BrowserDeliverer {
private BrowserListener listener = null;
@@ -174,4 +187,13 @@ public class AMQServerConsumer extends ServerConsumerImpl {
}
}
+ @Override
+ protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
+ //activemq5 doesn't decrease the count
+ //when not failed.
+ if (failed) {
+ ref.decrementDeliveryCount();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index 5403830..390b58f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl {
@Override
protected void doClose(final boolean failed) throws Exception {
+ Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
+ for (ServerConsumer consumer : consumersClone) {
+ AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
+ amqConsumer.closing();//prevent redeliver
+ }
+
synchronized (this) {
if (tx != null && tx.getXid() == null) {
((AMQTransactionImpl) tx).setRollbackForClose();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index ab9dec9..08185db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
else {
refs.add(ref);
- if (!failed) {
- // We don't decrement delivery count if the client failed, since there's a possibility that refs
- // were actually delivered but we just didn't get any acks for them
- // before failure
- ref.decrementDeliveryCount();
- }
+ updateDeliveryCountForCanceledRef(ref, failed);
}
if (isTrace) {
@@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return refs;
}
+ protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
+ if (!failed) {
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
+ ref.decrementDeliveryCount();
+ }
+ }
+
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
index 28ac25e..ddc6cd8 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
@@ -24,6 +24,7 @@ import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
session.close();
connection.close();
ArtemisBrokerHelper.stopArtemisBroker();
+ TcpTransportFactory.clearService();
}
/**