You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/04/27 16:08:01 UTC
[2/2] activemq-artemis git commit: ARTEMIS-505 Fix OptimizedAckTest
and testCloseConsumer
ARTEMIS-505 Fix OptimizedAckTest and testCloseConsumer
OptimizedAckTest: Using core api to replace old activemq
broker API to checking message count.
JmsQueueTransactionTest#testCloseConsumer: a bug in
delivery when prefetchSize is 0.
(InitalReconnectDelayTest)close connection after test.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ada6600e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ada6600e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ada6600e
Branch: refs/heads/master
Commit: ada6600ee3e49e43f48c8bae5ae07497b752ad19
Parents: 26c4680
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Apr 27 11:24:20 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Apr 27 10:06:14 2016 -0400
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQConsumer.java | 4 ++
.../core/server/impl/ServerConsumerImpl.java | 1 -
.../org/apache/activemq/OptimizedAckTest.java | 51 ++++++++++++--------
.../failover/InitalReconnectDelayTest.java | 4 +-
4 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 01820d6..e2deb80 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -167,6 +167,10 @@ public class AMQConsumer {
}
public void acquireCredit(int n) throws Exception {
+ if (messagePullHandler != null) {
+ //don't acquire any credits when the pull handler controls it!!
+ return;
+ }
int oldwindow = currentWindow.getAndAdd(n);
boolean promptDelivery = oldwindow < prefetchSize;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index fac4cf3..7acc6f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -65,7 +65,6 @@ import org.apache.activemq.artemis.utils.TypedProperties;
* Concrete implementation of a ClientConsumer.
*/
public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
- //private static final DebugLogger logger = DebugLogger.getLogger("redelivery.log");
// Constants ------------------------------------------------------------------------------------
private static boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
index cd50295..8da4cf7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OptimizedAckTest.java
@@ -23,8 +23,11 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import org.apache.activemq.broker.BrokerRegistry;
-import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
+import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,21 +63,23 @@ public class OptimizedAckTest extends TestSupport {
producer.send(session.createTextMessage("Hello" + i));
}
- final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue);
+ //check queue delivering count is 10
+ ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
+ Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
- assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
+ final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
+ assertTrue("deliverying count is 10", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
- return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ return 10 == coreQueue.getDeliveringCount();
}
}));
for (int i = 0; i < 6; i++) {
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
- assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+ assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
}
for (int i = 6; i < 10; i++) {
@@ -84,10 +89,11 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ return 3 == coreQueue.getDeliveringCount();
}
}));
}
+
}
public void testVerySlowReceivedMessageStillInflight() throws Exception {
@@ -98,15 +104,17 @@ public class OptimizedAckTest extends TestSupport {
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("Hello" + i));
}
-
- final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue);
+ //check queue delivering count is 10
+ ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
+ Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
+
+ final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
- return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ return 10 == coreQueue.getDeliveringCount();
}
}));
@@ -114,7 +122,7 @@ public class OptimizedAckTest extends TestSupport {
Thread.sleep(400);
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
- assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+ assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
}
for (int i = 6; i < 10; i++) {
@@ -125,7 +133,7 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ return 3 == coreQueue.getDeliveringCount();
}
}));
}
@@ -142,21 +150,22 @@ public class OptimizedAckTest extends TestSupport {
producer.send(session.createTextMessage("Hello" + i));
}
- final RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().findFirst().getRegionBroker();
MessageConsumer consumer = session.createConsumer(queue);
+ ArtemisBrokerWrapper broker = (ArtemisBrokerWrapper) ArtemisBrokerHelper.getBroker().getBroker();
+ Binding binding = broker.getServer().getPostOffice().getBinding(new SimpleString("jms.queue.test"));
+ final QueueImpl coreQueue = (QueueImpl) binding.getBindable();
assertTrue("prefetch full", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
- return 10 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ return 10 == coreQueue.getDeliveringCount();
}
}));
for (int i = 0; i < 6; i++) {
javax.jms.Message msg = consumer.receive(4000);
assertNotNull(msg);
- assertEquals("all prefetch is still in flight: " + i, 10, regionBroker.getDestinationStatistics().getInflight().getCount());
+ assertEquals("all prefetch is still in flight: " + i, 10, coreQueue.getDeliveringCount());
}
for (int i = 6; i < 10; i++) {
@@ -165,7 +174,7 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("most are acked but 3 remain", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- return 3 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ return 3 == coreQueue.getDeliveringCount();
}
}));
}
@@ -173,8 +182,8 @@ public class OptimizedAckTest extends TestSupport {
assertTrue("After delay the scheduled ack should ack all inflight.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
- LOG.info("inflight count: " + regionBroker.getDestinationStatistics().getInflight().getCount());
- return 0 == regionBroker.getDestinationStatistics().getInflight().getCount();
+ LOG.info("inflight count: " + coreQueue.getDeliveringCount());
+ return 0 == coreQueue.getDeliveringCount();
}
}));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ada6600e/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index dad241c..763a020 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -47,9 +47,6 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
protected EmbeddedJMS server1;
protected EmbeddedJMS server2;
-// protected BrokerService broker1;
-// protected BrokerService broker2;
-
@Test
public void testInitialReconnectDelay() throws Exception {
@@ -82,6 +79,7 @@ public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
//Inital reconnection should kick in and be darned close to what we expected
LOG.info("Failover took " + (end - start) + " ms.");
assertTrue("Failover took " + (end - start) + " ms and should be > 14000.", (end - start) > 14000);
+ connection.close();
}
@Test