You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/07 17:38:59 UTC
svn commit: r712189 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/
test/java/org/apache/activemq/transport/discovery/
Author: gtully
Date: Fri Nov 7 08:38:51 2008
New Revision: 712189
URL: http://svn.apache.org/viewvc?rev=712189&view=rev
Log:
reapplying patch from AMQ-1957 with small ammendment for transactional behaviour, reverting changes that fixed broken tests as tests now pass
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=712189&r1=712188&r2=712189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Nov 7 08:38:51 2008
@@ -200,13 +200,13 @@
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
- removeList.add(node);
if (!context.isInTransaction()) {
dequeueCounter++;
if (!this.getConsumerInfo().isBrowser()) {
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
}
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ removeList.add(node);
} else {
// setup a Synchronization to remove nodes from the
// dispatched list.
@@ -217,6 +217,7 @@
throws Exception {
synchronized(dispatchLock) {
dequeueCounter++;
+ dispatched.remove(node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
prefetchExtension--;
@@ -224,9 +225,7 @@
}
public void afterRollback() throws Exception {
- // Need to put it back in the front.
synchronized(dispatchLock) {
- dispatched.add(0, node);
// ActiveMQ workaround for AMQ-1730 - Please Ignore next line
node.incrementRedeliveryCounter();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
@@ -307,11 +306,9 @@
}
} else if (ack.isRedeliveredAck()) {
// Message was re-delivered but it was not yet considered to be
- // a
- // DLQ message.
+ // a DLQ message.
// Acknowledge all dispatched messages up till the message id of
- // the
- // acknowledgment.
+ // the ack.
boolean inAckRange = false;
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
@@ -396,9 +393,8 @@
+ ack + ") was not in the dispatch list: "
+ dispatched);
} else {
- LOG
- .debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
- + ack);
+ LOG.debug("Acknowledgment out of sync (Normally occurs when failover connection reconnects): "
+ + ack);
}
}
}
@@ -442,7 +438,7 @@
throw new JMSException("Unmatched acknowledege: " + ack
+ "; Could not find Message-ID " + lastAckedMsg
+ " in dispatched-list (end of ack)");
- if (ack.getMessageCount() != checkCount && ack.isStandardAck()) {
+ if (ack.getMessageCount() != checkCount && !ack.isInTransaction()) {
throw new JMSException("Unmatched acknowledege: " + ack
+ "; Expected message count (" + ack.getMessageCount()
+ ") differs from count in dispatched-list (" + checkCount
@@ -571,7 +567,7 @@
break;
}
- // Synchronize between dispatched list and remove of messageg from pending list
+ // Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action
synchronized(dispatchLock) {
pending.remove();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java?rev=712189&r1=712188&r2=712189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java Fri Nov 7 08:38:51 2008
@@ -639,10 +639,14 @@
Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination);
- // no commit so will auto rollback and get redispatched to redisptachConsumer
+ // no commit so will auto rollback and get re-dispatched to redisptachConsumer
session.close();
- assertNotNull(redispatchConsumer.receive(1000));
+ Message msg = redispatchConsumer.receive(1000);
+ assertNotNull(msg);
+ assertTrue(msg.getJMSRedelivered());
+ // should have re-delivery of 2, one for re-dispatch, one for rollback which is a little too much!
+ assertEquals(3, msg.getLongProperty("JMSXDeliveryCount"));
redispatchSession.commit();
assertNull(redispatchConsumer.receive(500));
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java?rev=712189&r1=712188&r2=712189&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.java Fri Nov 7 08:38:51 2008
@@ -35,8 +35,8 @@
Connection connection = factory.createConnection();
connection.setClientID("test");
fail("Did not fail to connect as expected.");
- } catch ( JMSException expected ) {
- assertTrue("reason is java.net.UnknownHostException", expected.getCause() instanceof java.net.UnknownHostException);
+ } catch ( JMSException expected ) {
+ assertTrue("reason is java.io.IOException, was: " + expected.getCause(), expected.getCause() instanceof java.io.IOException);
}
}
}