You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/01/18 14:19:28 UTC
svn commit: r900390 - in /activemq/branches/activemq-5.3/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/policy/
test/java/org/apache/activemq/transport/failover/
Author: gtully
Date: Mon Jan 18 13:19:27 2010
New Revision: 900390
URL: http://svn.apache.org/viewvc?rev=900390&view=rev
Log:
merge -c 898797 https://svn.apache.org/repos/asf/activemq/trunk - resolve https://issues.apache.org/activemq/browse/AMQ-2566, inflight out of sync after delivery to dlq. Also tidy failover transaction test from: https://issues.apache.org/activemq/browse/AMQ-2560
Added:
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
- copied unchanged from r898797, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/policy/NoRetryDeadLetterTest.java
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Jan 18 13:19:27 2010
@@ -199,7 +199,7 @@
}
}
if (LOG.isTraceEnabled()) {
- LOG.info("ack:" + ack);
+ LOG.trace("ack:" + ack);
}
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
@@ -241,7 +241,11 @@
public void afterRollback() throws Exception {
synchronized(dispatchLock) {
- node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ if (isSlave()) {
+ node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ } else {
+ // poisionAck will decrement - otherwise still inflight on client
+ }
}
}
});
@@ -362,8 +366,7 @@
if (inAckRange) {
sendToDLQ(context, node);
node.getRegionDestination().getDestinationStatistics()
- .getInflight().increment();
-
+ .getInflight().decrement();
removeList.add(node);
dequeueCounter++;
index++;
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTest.java Mon Jan 18 13:19:27 2010
@@ -57,6 +57,7 @@
assertMessage(msg, i);
assertNotNull("Should be a DLQ message for loop: " + i, msg);
}
+ session.commit();
}
protected void consumeAndRollback(int messageCounter) throws Exception {
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/broker/policy/DeadLetterTestSupport.java Mon Jan 18 13:19:27 2010
@@ -31,6 +31,8 @@
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -169,13 +171,15 @@
deliveryMode = DeliveryMode.NON_PERSISTENT;
durableSubscriber = false;
doTest();
+ validateConsumerPrefetch(this.getDestinationString(), 0);
}
-
+
public void testDurableQueueMessage() throws Exception {
super.topic = false;
deliveryMode = DeliveryMode.PERSISTENT;
durableSubscriber = false;
doTest();
+ validateConsumerPrefetch(this.getDestinationString(), 0);
}
public Destination getDestination() {
@@ -184,4 +188,16 @@
}
return destination;
}
+
+ private void validateConsumerPrefetch(String destination, long expectedCount) {
+ RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+ for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
+ if (dest.getName().equals(destination)) {
+ DestinationStatistics stats = dest.getDestinationStatistics();
+ LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
+ assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
+ expectedCount, stats.getInflight().getCount());
+ }
+ }
+ }
}
Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=900390&r1=900389&r2=900390&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Mon Jan 18 13:19:27 2010
@@ -467,8 +467,8 @@
TimeUnit.SECONDS.sleep(7);
// should not get a second message as there are two messages and two consumers
- // but with failover and unordered connection reinit it can get the second
- // message which will have a problem for the ack
+ // but with failover and unordered connection restore it can get the second
+ // message which could create a problem for a pending ack
msg = consumer1.receive(5000);
LOG.info("consumer1 second attempt got message: " + msg);
if (msg != null) {
@@ -503,10 +503,12 @@
assertNull("should be nothing left for consumer1", msg);
consumerSession1.commit();
- // consumer2 should get other message
+ // consumer2 should get other message provided consumer1 did not get 2
msg = consumer2.receive(5000);
LOG.info("post: from consumer2 received: " + msg);
- assertNotNull("got message on consumer2", msg);
+ if (receivedMessages.size() == 1) {
+ assertNotNull("got second message on consumer2", msg);
+ }
consumerSession2.commit();
for (Connection c: connections) {
@@ -532,7 +534,7 @@
if (msg == null) {
msg = sweeper.receive(5000);
}
- LOG.info("Received: " + msg);
+ LOG.info("Sweep received: " + msg);
assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}