You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/06/17 19:03:13 UTC

[1/2] activemq-artemis git commit: This closes #581

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 279780c89 -> 07b57e524


This closes #581


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/07b57e52
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/07b57e52
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/07b57e52

Branch: refs/heads/master
Commit: 07b57e524a0ec01b483d0d1003be0a4fe31df56c
Parents: 279780c 109ce6d
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Jun 17 14:58:54 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 17 14:58:54 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java | 20 +++++++++++++++++---
 .../core/protocol/openwire/amq/AMQSession.java  |  3 +--
 .../activemq/artemis/core/server/Queue.java     |  2 ++
 .../artemis/core/server/impl/QueueImpl.java     |  5 +++++
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 +++++
 .../artemiswrapper/ArtemisBrokerWrapper.java    | 15 ++++++++++++---
 .../activemq/JMSDurableTopicRedeliverTest.java  |  2 +-
 .../activemq/ZeroPrefetchConsumerTest.java      |  8 ++++----
 .../org/apache/activemq/broker/BrokerTest.java  |  8 ++++----
 .../unit/core/postoffice/impl/FakeQueue.java    |  6 +++++-
 10 files changed, 56 insertions(+), 18 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-571 Fix issues in openwire testsuite

Posted by cl...@apache.org.
ARTEMIS-571 Fix issues in openwire testsuite

* Redelivery count fix
* Regression in BrokerTest


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/109ce6de
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/109ce6de
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/109ce6de

Branch: refs/heads/master
Commit: 109ce6ded9e095c10108404ab93bd6aa2488b87f
Parents: 279780c
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Jun 17 10:49:01 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 17 14:58:54 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java | 20 +++++++++++++++++---
 .../core/protocol/openwire/amq/AMQSession.java  |  3 +--
 .../activemq/artemis/core/server/Queue.java     |  2 ++
 .../artemis/core/server/impl/QueueImpl.java     |  5 +++++
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 +++++
 .../artemiswrapper/ArtemisBrokerWrapper.java    | 15 ++++++++++++---
 .../activemq/JMSDurableTopicRedeliverTest.java  |  2 +-
 .../activemq/ZeroPrefetchConsumerTest.java      |  8 ++++----
 .../org/apache/activemq/broker/BrokerTest.java  |  8 ++++----
 .../unit/core/postoffice/impl/FakeQueue.java    |  6 +++++-
 10 files changed, 56 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 70dfde3..17f5b79 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -43,6 +43,7 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQConsumer {
@@ -271,6 +272,10 @@ public class AMQConsumer {
             transaction.commit(true);
          }
       }
+      if (ack.isExpiredAck()) {
+         //adjust delivering count for expired messages
+         this.serverConsumer.getQueue().decDelivering(ackList.size());
+      }
    }
 
    public void browseFinished() {
@@ -314,14 +319,23 @@ public class AMQConsumer {
       }
    }
 
-   public void updateDeliveryCountAfterCancel(MessageReference ref) {
+   public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
       long seqId = ref.getMessage().getMessageID();
       long lastDelSeqId = info.getLastDeliveredSequenceId();
 
+      //because delivering count is always one greater than redelivery count
+      //we adjust it down before further calculating.
+      ref.decrementDeliveryCount();
+
       // This is a specific rule of the protocol
-      if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
-         ref.decrementDeliveryCount();
+      if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
+         // this takes care of un-acked messages in non-tx deliveries
+         // tx cases are handled by
+         // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
+         ref.incrementDeliveryCount();
       }
+
+      return true;
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 17d3e18..472917d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -123,8 +123,7 @@ public class AMQSession implements SessionCallback {
    @Override
    public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
       if (consumer.getProtocolData() != null) {
-         ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
-         return true;
+         return ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
       }
       else {
          return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ec9d4a3..fa39c22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -249,4 +249,6 @@ public interface Queue extends Bindable {
     * @return the user who created this queue
     */
    SimpleString getUser();
+
+   void decDelivering(int size);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index eaf7656..c809abf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2854,6 +2854,11 @@ public class QueueImpl implements Queue {
       deliveringCount.decrementAndGet();
    }
 
+   @Override
+   public void decDelivering(int size) {
+      deliveringCount.addAndGet(-size);
+   }
+
    private void configureExpiry(final AddressSettings settings) {
       this.expiryAddress = settings == null ? null : settings.getExpiryAddress();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index c489e12..c96606d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1289,5 +1289,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       public SimpleString getUser() {
          return null;
       }
+
+      @Override
+      public void decDelivering(int size) {
+
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 17d81d7..152242e 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -41,12 +41,12 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
 
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 
 public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
-
    protected final Map<String, SimpleString> testQueues = new HashMap<>();
    protected JMSServerManagerImpl jmsServer;
    protected MBeanServer mbeanServer;
@@ -251,9 +251,18 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
       }
    }
 
-   public long getAMQueueMessageCount(String physicalName) {
+   public long getAMQueueMessageCount(ActiveMQDestination amq5Dest) {
+      if (amq5Dest.isTopic()) {
+         throw new IllegalArgumentException("Method only accept queue type parameter.");
+      }
       long count = 0;
-      String qname = "jms.queue." + physicalName;
+      String qname = null;
+      if (amq5Dest.isTemporary()) {
+         qname = "jms.tempqueue." + amq5Dest.getPhysicalName();
+      }
+      else {
+         qname = "jms.queue." + amq5Dest.getPhysicalName();
+      }
       Binding binding = server.getPostOffice().getBinding(new SimpleString(qname));
       if (binding != null) {
          QueueImpl q = (QueueImpl) binding.getBindable();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
index c2f48f2..5baee6c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
@@ -57,8 +57,8 @@ public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
       assertEquals(((TextMessage) unackMessage).getText(), text);
       assertFalse(unackMessage.getJMSRedelivered());
       assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
-      consumeSession.close();
       consumer.close();
+      consumeSession.close();
 
       // receive then acknowledge
       consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index a9a564b..c171e4b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -136,9 +136,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
       }
       // now lets receive it
       MessageConsumer consumer = session.createConsumer(queue);
-      TextMessage answer = (TextMessage) consumer.receiveNoWait();
+      TextMessage answer = (TextMessage) consumer.receive(5000);
       assertEquals("Should have received a message!", answer.getText(), "Msg1");
-      answer = (TextMessage) consumer.receiveNoWait();
+      answer = (TextMessage) consumer.receive(5000);
       assertEquals("Should have received a message!", answer.getText(), "Msg2");
       if (transacted) {
          session.commit();
@@ -157,9 +157,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
       // now lets receive it
       MessageConsumer consumer1 = session.createConsumer(queue);
       MessageConsumer consumer2 = session.createConsumer(queue);
-      TextMessage answer = (TextMessage) consumer1.receiveNoWait();
+      TextMessage answer = (TextMessage) consumer1.receive(5000);
       assertEquals("Should have received a message!", answer.getText(), "Msg1");
-      answer = (TextMessage) consumer2.receiveNoWait();
+      answer = (TextMessage) consumer2.receive(5000);
       assertEquals("Should have received a message!", answer.getText(), "Msg2");
 
       answer = (TextMessage) consumer2.receiveNoWait();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
index 9458ae3..dbe90e7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -459,7 +459,7 @@ public class BrokerTest extends BrokerTestSupport {
       //due to async tx operations, we need some time for message count to go down
       Thread.sleep(1000);
       ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
-      long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+      long messageCount = wrapper.getAMQueueMessageCount(destination);
 
       // The queue should now only have the remaining 2 messages
       assertEquals(2, messageCount);
@@ -1473,15 +1473,15 @@ public class BrokerTest extends BrokerTestSupport {
       assertEquals(m.getMessageId(), message1.getMessageId());
 
       ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
-      long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+      long messageCount = wrapper.getAMQueueMessageCount(destination);
       assertTrue(messageCount == 2);
       connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
-      messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+      messageCount = wrapper.getAMQueueMessageCount(destination);
       assertTrue(messageCount == 2);
       connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
       //give some time for broker to count down
       Thread.sleep(2000);
-      messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+      messageCount = wrapper.getAMQueueMessageCount(destination);
       assertTrue(messageCount == 1);
 
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 78659d2..250d211 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -570,4 +570,8 @@ public class FakeQueue implements Queue {
    public SimpleString getUser() {
       return null;
    }
-}
\ No newline at end of file
+
+   @Override
+   public void decDelivering(int size) {
+   }
+}