You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/06/17 19:03:14 UTC
[2/2] activemq-artemis git commit: ARTEMIS-571 Fix issues in openwire
testsuite
ARTEMIS-571 Fix issues in openwire testsuite
* Redelivery count fix
* Regression in BrokerTest
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/109ce6de
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/109ce6de
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/109ce6de
Branch: refs/heads/master
Commit: 109ce6ded9e095c10108404ab93bd6aa2488b87f
Parents: 279780c
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Jun 17 10:49:01 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Jun 17 14:58:54 2016 -0400
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQConsumer.java | 20 +++++++++++++++++---
.../core/protocol/openwire/amq/AMQSession.java | 3 +--
.../activemq/artemis/core/server/Queue.java | 2 ++
.../artemis/core/server/impl/QueueImpl.java | 5 +++++
.../impl/ScheduledDeliveryHandlerTest.java | 5 +++++
.../artemiswrapper/ArtemisBrokerWrapper.java | 15 ++++++++++++---
.../activemq/JMSDurableTopicRedeliverTest.java | 2 +-
.../activemq/ZeroPrefetchConsumerTest.java | 8 ++++----
.../org/apache/activemq/broker/BrokerTest.java | 8 ++++----
.../unit/core/postoffice/impl/FakeQueue.java | 6 +++++-
10 files changed, 56 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 70dfde3..17f5b79 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -43,6 +43,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.wireformat.WireFormat;
public class AMQConsumer {
@@ -271,6 +272,10 @@ public class AMQConsumer {
transaction.commit(true);
}
}
+ if (ack.isExpiredAck()) {
+ //adjust delivering count for expired messages
+ this.serverConsumer.getQueue().decDelivering(ackList.size());
+ }
}
public void browseFinished() {
@@ -314,14 +319,23 @@ public class AMQConsumer {
}
}
- public void updateDeliveryCountAfterCancel(MessageReference ref) {
+ public boolean updateDeliveryCountAfterCancel(MessageReference ref) {
long seqId = ref.getMessage().getMessageID();
long lastDelSeqId = info.getLastDeliveredSequenceId();
+ //because delivering count is always one greater than redelivery count
+ //we adjust it down before further calculating.
+ ref.decrementDeliveryCount();
+
// This is a specific rule of the protocol
- if (!(lastDelSeqId < 0 || seqId <= lastDelSeqId)) {
- ref.decrementDeliveryCount();
+ if (lastDelSeqId == RemoveInfo.LAST_DELIVERED_UNKNOWN) {
+ // this takes care of un-acked messages in non-tx deliveries
+ // tx cases are handled by
+ // org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection.CommandProcessor.processRollbackTransaction()
+ ref.incrementDeliveryCount();
}
+
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 17d3e18..472917d 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -123,8 +123,7 @@ public class AMQSession implements SessionCallback {
@Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
if (consumer.getProtocolData() != null) {
- ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
- return true;
+ return ((AMQConsumer) consumer.getProtocolData()).updateDeliveryCountAfterCancel(ref);
}
else {
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index ec9d4a3..fa39c22 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -249,4 +249,6 @@ public interface Queue extends Bindable {
* @return the user who created this queue
*/
SimpleString getUser();
+
+ void decDelivering(int size);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index eaf7656..c809abf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2854,6 +2854,11 @@ public class QueueImpl implements Queue {
deliveringCount.decrementAndGet();
}
+ @Override
+ public void decDelivering(int size) {
+ deliveringCount.addAndGet(-size);
+ }
+
private void configureExpiry(final AddressSettings settings) {
this.expiryAddress = settings == null ? null : settings.getExpiryAddress();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index c489e12..c96606d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1289,5 +1289,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
public SimpleString getUser() {
return null;
}
+
+ @Override
+ public void decDelivering(int size) {
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 17d81d7..152242e 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -41,12 +41,12 @@ import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
-
protected final Map<String, SimpleString> testQueues = new HashMap<>();
protected JMSServerManagerImpl jmsServer;
protected MBeanServer mbeanServer;
@@ -251,9 +251,18 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
}
}
- public long getAMQueueMessageCount(String physicalName) {
+ public long getAMQueueMessageCount(ActiveMQDestination amq5Dest) {
+ if (amq5Dest.isTopic()) {
+ throw new IllegalArgumentException("Method only accept queue type parameter.");
+ }
long count = 0;
- String qname = "jms.queue." + physicalName;
+ String qname = null;
+ if (amq5Dest.isTemporary()) {
+ qname = "jms.tempqueue." + amq5Dest.getPhysicalName();
+ }
+ else {
+ qname = "jms.queue." + amq5Dest.getPhysicalName();
+ }
Binding binding = server.getPostOffice().getBinding(new SimpleString(qname));
if (binding != null) {
QueueImpl q = (QueueImpl) binding.getBindable();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
index c2f48f2..5baee6c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java
@@ -57,8 +57,8 @@ public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest {
assertEquals(((TextMessage) unackMessage).getText(), text);
assertFalse(unackMessage.getJMSRedelivered());
assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1);
- consumeSession.close();
consumer.close();
+ consumeSession.close();
// receive then acknowledge
consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index a9a564b..c171e4b 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -136,9 +136,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
}
// now lets receive it
MessageConsumer consumer = session.createConsumer(queue);
- TextMessage answer = (TextMessage) consumer.receiveNoWait();
+ TextMessage answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
- answer = (TextMessage) consumer.receiveNoWait();
+ answer = (TextMessage) consumer.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
if (transacted) {
session.commit();
@@ -157,9 +157,9 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport {
// now lets receive it
MessageConsumer consumer1 = session.createConsumer(queue);
MessageConsumer consumer2 = session.createConsumer(queue);
- TextMessage answer = (TextMessage) consumer1.receiveNoWait();
+ TextMessage answer = (TextMessage) consumer1.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg1");
- answer = (TextMessage) consumer2.receiveNoWait();
+ answer = (TextMessage) consumer2.receive(5000);
assertEquals("Should have received a message!", answer.getText(), "Msg2");
answer = (TextMessage) consumer2.receiveNoWait();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
index 9458ae3..dbe90e7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java
@@ -459,7 +459,7 @@ public class BrokerTest extends BrokerTestSupport {
//due to async tx operations, we need some time for message count to go down
Thread.sleep(1000);
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
- long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+ long messageCount = wrapper.getAMQueueMessageCount(destination);
// The queue should now only have the remaining 2 messages
assertEquals(2, messageCount);
@@ -1473,15 +1473,15 @@ public class BrokerTest extends BrokerTestSupport {
assertEquals(m.getMessageId(), message1.getMessageId());
ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
- long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+ long messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 2);
connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE));
- messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+ messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 2);
connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE));
//give some time for broker to count down
Thread.sleep(2000);
- messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName());
+ messageCount = wrapper.getAMQueueMessageCount(destination);
assertTrue(messageCount == 1);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/109ce6de/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 78659d2..250d211 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -570,4 +570,8 @@ public class FakeQueue implements Queue {
public SimpleString getUser() {
return null;
}
-}
\ No newline at end of file
+
+ @Override
+ public void decDelivering(int size) {
+ }
+}