You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/01 17:38:46 UTC

[50/52] [abbrv] activemq-artemis git commit: Fixed some redelivery tests

Fixed some redelivery tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/552be8c5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/552be8c5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/552be8c5

Branch: refs/heads/refactor-openwire
Commit: 552be8c5ffe50eaac5030174bd438bc9b7b30ee2
Parents: 4a4b682
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Feb 26 22:24:03 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 1 11:37:32 2016 -0500

----------------------------------------------------------------------
 .../openwire/amq/AMQServerConsumer.java         | 22 ++++++++++++++++++++
 .../protocol/openwire/amq/AMQServerSession.java |  7 +++++++
 .../core/server/impl/ServerConsumerImpl.java    | 16 ++++++++------
 .../activemq/test/JmsTopicSendReceiveTest.java  |  2 ++
 4 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
index 3f94351..34789b0 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerConsumer.java
@@ -35,6 +35,7 @@ public class AMQServerConsumer extends ServerConsumerImpl {
 
    // TODO-NOW: remove this once unified
    AMQConsumer amqConsumer;
+   boolean isClosing;
 
    public AMQConsumer getAmqConsumer() {
       return amqConsumer;
@@ -67,6 +68,18 @@ public class AMQServerConsumer extends ServerConsumerImpl {
       this.browserDeliverer = newBrowserDeliverer;
    }
 
+   public void closing() {
+      isClosing = true;
+   }
+
+   @Override
+   public HandleStatus handle(final MessageReference ref) throws Exception {
+      if (isClosing) {
+         return HandleStatus.BUSY;
+      }
+      return super.handle(ref);
+   }
+
    private class AMQBrowserDeliverer extends BrowserDeliverer {
 
       private BrowserListener listener = null;
@@ -174,4 +187,13 @@ public class AMQServerConsumer extends ServerConsumerImpl {
       }
    }
 
+   @Override
+   protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
+      //activemq5 doesn't decrease the count
+      //when not failed.
+      if (failed) {
+         ref.decrementDeliveryCount();
+      }
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index 5403830..390b58f 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -90,6 +91,12 @@ public class AMQServerSession extends ServerSessionImpl {
 
    @Override
    protected void doClose(final boolean failed) throws Exception {
+      Set<ServerConsumer> consumersClone = new HashSet<>(consumers.values());
+      for (ServerConsumer consumer : consumersClone) {
+         AMQServerConsumer amqConsumer = (AMQServerConsumer)consumer;
+         amqConsumer.closing();//prevent redeliver
+      }
+
       synchronized (this) {
          if (tx != null && tx.getXid() == null) {
             ((AMQTransactionImpl) tx).setRollbackForClose();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index ab9dec9..08185db 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -556,12 +556,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
                else {
                   refs.add(ref);
-                  if (!failed) {
-                     // We don't decrement delivery count if the client failed, since there's a possibility that refs
-                     // were actually delivered but we just didn't get any acks for them
-                     // before failure
-                     ref.decrementDeliveryCount();
-                  }
+                  updateDeliveryCountForCanceledRef(ref, failed);
                }
 
                if (isTrace) {
@@ -576,6 +571,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
       return refs;
    }
 
+   protected void updateDeliveryCountForCanceledRef(MessageReference ref, boolean failed) {
+      if (!failed) {
+         // We don't decrement delivery count if the client failed, since there's a possibility that refs
+         // were actually delivered but we just didn't get any acks for them
+         // before failure
+         ref.decrementDeliveryCount();
+      }
+   }
+
    @Override
    public void setStarted(final boolean started) {
       synchronized (lock) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/552be8c5/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
index 28ac25e..ddc6cd8 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/test/JmsTopicSendReceiveTest.java
@@ -24,6 +24,7 @@ import javax.jms.Session;
 import javax.jms.Topic;
 
 import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -92,6 +93,7 @@ public class JmsTopicSendReceiveTest extends JmsSendReceiveTestSupport {
       session.close();
       connection.close();
       ArtemisBrokerHelper.stopArtemisBroker();
+      TcpTransportFactory.clearService();
    }
 
    /**