You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/27 16:08:00 UTC

[1/2] activemq-artemis git commit: This closes #492

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 26c4680c5 -> 2135911ea


This closes #492


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2135911e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2135911e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2135911e

Branch: refs/heads/master
Commit: 2135911eaaac0fd2e5ed0d059cd9d1c6e4a9ee76
Parents: 26c4680 ada6600
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Apr 27 10:06:14 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 27 10:06:14 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java |  4 ++
 .../core/server/impl/ServerConsumerImpl.java    |  1 -
 .../org/apache/activemq/OptimizedAckTest.java   | 51 ++++++++++++--------
 .../failover/InitalReconnectDelayTest.java      |  4 +-
 4 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-505 Fix OptimizedAckTest and testCloseConsumer

Posted by cl...@apache.org.
ARTEMIS-505 Fix OptimizedAckTest and testCloseConsumer

OptimizedAckTest: Using core api to replace old activemq
broker API to checking message count.
JmsQueueTransactionTest#testCloseConsumer: a bug in
delivery when prefetchSize is 0.
(InitalReconnectDelayTest)close connection after test.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ada6600e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ada6600e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ada6600e

Branch: refs/heads/master
Commit: ada6600ee3e49e43f48c8bae5ae07497b752ad19
Parents: 26c4680
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Apr 27 11:24:20 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 27 10:06:14 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java |  4 ++
 .../core/server/impl/ServerConsumerImpl.java    |  1 -
 .../org/apache/activemq/OptimizedAckTest.java   | 51 ++++++++++++--------
 .../failover/InitalReconnectDelayTest.java      |  4 +-
 4 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 01820d6..e2deb80 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -167,6 +167,10 @@ public class AMQConsumer {
    }
 
    public void acquireCredit(int n) throws Exception {
+      if (messagePullHandler != null) {
+         //don't acquire any credits when the pull handler controls it!!
+         return;
+      }
       int oldwindow = currentWindow.getAndAdd(n);
 
       boolean promptDelivery = oldwindow < prefetchSize;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index fac4cf3..7acc6f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -65,7 +65,6 @@ import org.apache.activemq.artemis.utils.TypedProperties;
  * Concrete implementation of a ClientConsumer.
  */
 public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
-   //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
    // Constants ------------------------------------------------------------------------------------
 
    private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
index cd50295..8da4cf7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
@@ -23,8 +23,11 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
 import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,21 +63,23 @@ public class OptimizedAckTest extends TestSupport {
          producer.send(session.createTextMessage("Hello" + i));
       }
 
-      final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
       MessageConsumer consumer = session.createConsumer(queue);
+      //check queue delivering count is 10
+      ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
+      Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
 
-      assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+      final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
+      assertTrue("deliverying count is 10", Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisified() throws Exception {
-            LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
-            return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            return 10 == coreQueue.getDeliveringCount();
          }
       }));
 
       for (int i = 0; i < 6; i++) {
          javax.jms.Message msg = consumer.receive(4000);
          assertNotNull(msg);
-         assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+         assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
       }
 
       for (int i = 6; i < 10; i++) {
@@ -84,10 +89,11 @@ public class OptimizedAckTest extends TestSupport {
          assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-               return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+               return 3 == coreQueue.getDeliveringCount();
             }
          }));
       }
+
    }
 
    public void testVerySlowReceivedMessageStillInflight() throws Exception {
@@ -98,15 +104,17 @@ public class OptimizedAckTest extends TestSupport {
       for (int i = 0; i < 10; i++) {
          producer.send(session.createTextMessage("Hello" + i));
       }
-
-      final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
       MessageConsumer consumer = session.createConsumer(queue);
 
+      //check queue delivering count is 10
+      ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
+      Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
+
+      final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
       assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisified() throws Exception {
-            LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
-            return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            return 10 == coreQueue.getDeliveringCount();
          }
       }));
 
@@ -114,7 +122,7 @@ public class OptimizedAckTest extends TestSupport {
          Thread.sleep(400);
          javax.jms.Message msg = consumer.receive(4000);
          assertNotNull(msg);
-         assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+         assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
       }
 
       for (int i = 6; i < 10; i++) {
@@ -125,7 +133,7 @@ public class OptimizedAckTest extends TestSupport {
          assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-               return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+               return 3 == coreQueue.getDeliveringCount();
             }
          }));
       }
@@ -142,21 +150,22 @@ public class OptimizedAckTest extends TestSupport {
          producer.send(session.createTextMessage("Hello" + i));
       }
 
-      final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
       MessageConsumer consumer = session.createConsumer(queue);
+      ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
+      Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
 
+      final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
       assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisified() throws Exception {
-            LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
-            return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            return 10 == coreQueue.getDeliveringCount();
          }
       }));
 
       for (int i = 0; i < 6; i++) {
          javax.jms.Message msg = consumer.receive(4000);
          assertNotNull(msg);
-         assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+         assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
       }
 
       for (int i = 6; i < 10; i++) {
@@ -165,7 +174,7 @@ public class OptimizedAckTest extends TestSupport {
          assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-               return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+               return 3 == coreQueue.getDeliveringCount();
             }
          }));
       }
@@ -173,8 +182,8 @@ public class OptimizedAckTest extends TestSupport {
       assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() {
          @Override
          public boolean isSatisified() throws Exception {
-            LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
-            return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
+            LOG.info("inflight count: " + coreQueue.getDeliveringCount());
+            return 0 == coreQueue.getDeliveringCount();
          }
       }));
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index dad241c..763a020 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -47,9 +47,6 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
    protected EmbeddedJMS server1;
    protected EmbeddedJMS server2;
 
-//   protected BrokerService broker1;
-//   protected BrokerService broker2;
-
    @Test
    public void testInitialReconnectDelay() throws Exception {
 
@@ -82,6 +79,7 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
       //Inital reconnection should kick in and be darned close to what we expected
       LOG.info("Failover took " + (end - start) + " ms.");
       assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
+      connection.close();
    }
 
    @Test