You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/11/08 09:31:40 UTC
svn commit: r472423 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/ test/java/org/apache/activemq/test/rollback/
Author: jstrachan
Date: Wed Nov 8 00:31:39 2006
New Revision: 472423
URL: http://svn.apache.org/viewvc?view=rev&rev=472423
Log:
backported fix for AMQ-1034 to 4.0.x branch
Added:
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java (with props)
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java (with props)
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Nov 8 00:31:39 2006
@@ -131,7 +131,6 @@
* @param noLocal
* @param browser
* @param dispatchAsync
- * @param value
* @throws JMSException
*/
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
@@ -699,7 +698,6 @@
* Acknowledge all the messages that have been delivered to the client upto
* this point.
*
- * @param deliverySequenceId
* @throws JMSException
*/
public void acknowledge() throws JMSException {
@@ -740,6 +738,11 @@
}
if(deliveredMessages.isEmpty())
return;
+
+ // Only increase the redlivery delay after the first redelivery..
+ if( rollbackCounter > 0 )
+ redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+
rollbackCounter++;
if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
// We need to NACK the messages so that they get sent to the
@@ -755,28 +758,28 @@
}else{
// stop the delivery of messages.
unconsumedMessages.stop();
- // Start up the delivery again a little later.
- if(redeliveryDelay==0){
- redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
- }else{
- if(redeliveryPolicy.isUseExponentialBackOff())
- redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
- }
- Scheduler.executeAfterDelay(new Runnable(){
- public void run(){
- try{
- if(started.get())
- start();
- }catch(JMSException e){
- session.connection.onAsyncException(e);
- }
- }
- },redeliveryDelay);
for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){
MessageDispatch md=(MessageDispatch) iter.next();
md.getMessage().onMessageRolledBack();
unconsumedMessages.enqueueFirst(md);
}
+
+ if( redeliveryDelay > 0 ) {
+ // Start up the delivery again a little later.
+ Scheduler.executeAfterDelay(new Runnable(){
+ public void run(){
+ try{
+ if(started.get())
+ start();
+ }catch(JMSException e){
+ session.connection.onAsyncException(e);
+ }
+ }
+ },redeliveryDelay);
+ } else {
+ start();
+ }
+
}
deliveredCounter-=deliveredMessages.size();
deliveredMessages.clear();
@@ -789,30 +792,33 @@
public void dispatch(MessageDispatch md) {
MessageListener listener = this.messageListener;
try {
- if (!unconsumedMessages.isClosed()) {
- if (listener != null && unconsumedMessages.isRunning() ) {
- ActiveMQMessage message = createActiveMQMessage(md);
- beforeMessageIsConsumed(md);
- try {
- listener.onMessage(message);
- afterMessageIsConsumed(md, false);
- } catch (RuntimeException e) {
- if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
- // Redeliver the message
- } else {
- // Transacted or Client ack: Deliver the next message.
- afterMessageIsConsumed(md, false);
- }
- }
- } else {
- unconsumedMessages.enqueue(md);
- if (availableListener != null) {
- availableListener.onMessageAvailable(this);
- }
- }
+ synchronized(unconsumedMessages.getMutex()){
+ if (!unconsumedMessages.isClosed()) {
+ if (listener != null && unconsumedMessages.isRunning() ) {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ beforeMessageIsConsumed(md);
+ try {
+ listener.onMessage(message);
+ afterMessageIsConsumed(md, false);
+ } catch (RuntimeException e) {
+ if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
+ // Redeliver the message
+ } else {
+ // Transacted or Client ack: Deliver the next message.
+ afterMessageIsConsumed(md, false);
+ }
+ log.warn("Exception while processing message: " + e, e);
+ }
+ } else {
+ unconsumedMessages.enqueue(md);
+ if (availableListener != null) {
+ availableListener.onMessageAvailable(this);
+ }
+ }
+ }
}
} catch (Exception e) {
- log.warn("could not process message: " + md, e);
+ session.connection.onAsyncException(e);
}
}
@@ -821,18 +827,12 @@
}
public void start() throws JMSException {
+ if (unconsumedMessages.isClosed()) {
+ return;
+ }
started.set(true);
unconsumedMessages.start();
- MessageListener listener = this.messageListener;
- if( listener!=null ) {
- MessageDispatch md;
- while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
- ActiveMQMessage message = createActiveMQMessage(md);
- beforeMessageIsConsumed(md);
- listener.onMessage(message);
- afterMessageIsConsumed(md, false);
- }
- }
+ session.executor.wakeup();
}
public void stop() {
@@ -843,5 +843,29 @@
public String toString() {
return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }";
}
+
+ /**
+ * Delivers a message to the message listener.
+ * @return
+ * @throws JMSException
+ */
+ public boolean iterate() {
+ MessageListener listener = this.messageListener;
+ if( listener!=null ) {
+ MessageDispatch md = unconsumedMessages.dequeueNoWait();
+ if( md!=null ) {
+ try {
+ ActiveMQMessage message = createActiveMQMessage(md);
+ beforeMessageIsConsumed(md);
+ listener.onMessage(message);
+ afterMessageIsConsumed(md, false);
+ } catch (JMSException e) {
+ session.connection.onAsyncException(e);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Wed Nov 8 00:31:39 2006
@@ -61,12 +61,17 @@
}
}
- private void wakeup() {
- if( taskRunner!=null && !dispatchedBySessionPool && hasUncomsumedMessages() ) {
- try {
- taskRunner.wakeup();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ public void wakeup() {
+ if( !dispatchedBySessionPool ) {
+ if( taskRunner!=null ) {
+ try {
+ taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ while( iterate() )
+ ;
}
}
}
@@ -144,6 +149,16 @@
}
public boolean iterate() {
+
+ // Deliver any messages queued on the consumer to their listeners.
+ for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
+ ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
+ if( consumer.iterate() ) {
+ return true;
+ }
+ }
+
+ // No messages left queued on the listeners.. so now dispatch messages queued on the session
MessageDispatch message = messageQueue.dequeueNoWait();
if( message==null ) {
return false;
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Wed Nov 8 00:31:39 2006
@@ -18,17 +18,24 @@
package org.apache.activemq;
import java.io.Serializable;
+import java.util.Random;
/**
* Configuration options used to control how messages are re-delivered when they
* are rolled back.
- *
+ *
+ * @org.apache.xbean.XBean element="redeliveryPolicy"
+ *
* @version $Revision: 1.11 $
*/
public class RedeliveryPolicy implements Cloneable, Serializable {
+ // +/-15% for a 30% spread -cgs
+ protected double collisionAvoidanceFactor = 0.15d;
protected int maximumRedeliveries = 5;
protected long initialRedeliveryDelay = 1000L;
+ protected static Random randomNumberGenerator;
+ protected boolean useCollisionAvoidance = false;
protected boolean useExponentialBackOff = false;
protected short backOffMultiplier = 5;
@@ -52,6 +59,14 @@
this.backOffMultiplier = backOffMultiplier;
}
+ public short getCollisionAvoidancePercent() {
+ return (short) Math.round(collisionAvoidanceFactor * 100);
+ }
+
+ public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+ this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+ }
+
public long getInitialRedeliveryDelay() {
return initialRedeliveryDelay;
}
@@ -68,6 +83,41 @@
this.maximumRedeliveries = maximumRedeliveries;
}
+ public long getRedeliveryDelay(long previousDelay) {
+ long redeliveryDelay;
+
+ if (previousDelay == 0) {
+ redeliveryDelay = initialRedeliveryDelay;
+ } else if (useExponentialBackOff && backOffMultiplier > 1) {
+ redeliveryDelay = previousDelay * backOffMultiplier;
+ } else {
+ redeliveryDelay = previousDelay;
+ }
+
+ if (useCollisionAvoidance) {
+ if (randomNumberGenerator == null) {
+ initRandomNumberGenerator();
+ }
+
+ /*
+ * First random determines +/-, second random determines how far to
+ * go in that direction. -cgs
+ */
+ double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
+ redeliveryDelay += redeliveryDelay * variance;
+ }
+
+ return redeliveryDelay;
+ }
+
+ public boolean isUseCollisionAvoidance() {
+ return useCollisionAvoidance;
+ }
+
+ public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+ this.useCollisionAvoidance = useCollisionAvoidance;
+ }
+
public boolean isUseExponentialBackOff() {
return useExponentialBackOff;
}
@@ -75,4 +125,11 @@
public void setUseExponentialBackOff(boolean useExponentialBackOff) {
this.useExponentialBackOff = useExponentialBackOff;
}
+
+ protected static synchronized void initRandomNumberGenerator() {
+ if (randomNumberGenerator == null) {
+ randomNumberGenerator = new Random();
+ }
+ }
+
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Nov 8 00:31:39 2006
@@ -121,6 +121,10 @@
prefetchExtension--;
}
}
+
+ public void afterRollback() throws Exception {
+ super.afterRollback();
+ }
});
}
index++;
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java Wed Nov 8 00:31:39 2006
@@ -57,7 +57,7 @@
protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
- redeliveryPolicy.setMaximumRedeliveries(2);
+ redeliveryPolicy.setMaximumRedeliveries(3);
redeliveryPolicy.setBackOffMultiplier((short) 2);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
@@ -82,7 +82,7 @@
try {
log.info("Message Received: " + message);
counter++;
- if (counter <= 3) {
+ if (counter <= 4) {
log.info("Message Rollback.");
session.rollback();
} else {
@@ -119,24 +119,26 @@
} catch (InterruptedException e) {
}
- // first try
- assertEquals(1, listener.counter);
+
+ // first try.. should get 2 since there is no delay on the
+ // first redeliver..
+ assertEquals(2, listener.counter);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
- // second try (redelivery after 1 sec)
- assertEquals(2, listener.counter);
+ // 2nd redeliver (redelivery after 1 sec)
+ assertEquals(3, listener.counter);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
- // third try (redelivery after 2 seconds) - it should give up after that
- assertEquals(3, listener.counter);
+ // 3rd redeliver (redelivery after 2 seconds) - it should give up after that
+ assertEquals(4, listener.counter);
// create new message
producer.send(createTextMessage(session));
@@ -148,7 +150,7 @@
// ignore
}
// it should be committed, so no redelivery
- assertEquals(4, listener.counter);
+ assertEquals(5, listener.counter);
try {
Thread.sleep(1500);
@@ -156,7 +158,7 @@
// ignore
}
// no redelivery, counter should still be 4
- assertEquals(4, listener.counter);
+ assertEquals(5, listener.counter);
session.close();
}
@@ -184,8 +186,8 @@
} catch (InterruptedException e) {
}
- // first try
- assertEquals(1, listener.counter);
+ // first try
+ assertEquals(2, listener.counter);
try {
Thread.sleep(1000);
@@ -193,7 +195,7 @@
}
// second try (redelivery after 1 sec)
- assertEquals(2, listener.counter);
+ assertEquals(3, listener.counter);
try {
Thread.sleep(2000);
@@ -201,7 +203,7 @@
}
// third try (redelivery after 2 seconds) - it should give up after that
- assertEquals(3, listener.counter);
+ assertEquals(4, listener.counter);
// create new message
producer.send(createTextMessage(session));
@@ -213,7 +215,7 @@
// ignore
}
// it should be committed, so no redelivery
- assertEquals(4, listener.counter);
+ assertEquals(5, listener.counter);
try {
Thread.sleep(1500);
@@ -221,7 +223,7 @@
// ignore
}
// no redelivery, counter should still be 4
- assertEquals(4, listener.counter);
+ assertEquals(5, listener.counter);
session.close();
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Wed Nov 8 00:31:39 2006
@@ -70,9 +70,15 @@
assertEquals("1st", m.getText());
session.rollback();
- // Show re-delivery delay is incrementing.
+ // No delay on first rollback..
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ session.rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
m = (TextMessage)consumer.receive(100);
assertNull(m);
+
m = (TextMessage)consumer.receive(500);
assertNotNull(m);
assertEquals("1st", m.getText());
@@ -117,7 +123,12 @@
assertEquals("1st", m.getText());
session.rollback();
- // Show re-delivery delay is incrementing.
+ // No delay on first rollback..
+ m = (TextMessage)consumer.receive(100);
+ assertNotNull(m);
+ session.rollback();
+
+ // Show subsequent re-delivery delay is incrementing.
m = (TextMessage)consumer.receive(100);
assertNull(m);
m = (TextMessage)consumer.receive(500);
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java?view=auto&rev=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java Wed Nov 8 00:31:39 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test.rollback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+
+public class DelegatingTransactionalMessageListener implements MessageListener {
+ private static final transient Log log = LogFactory.getLog(DelegatingTransactionalMessageListener.class);
+
+ private final MessageListener underlyingListener;
+ private boolean transacted = true;
+ //private int ackMode = Session.AUTO_ACKNOWLEDGE;
+ private int ackMode = Session.SESSION_TRANSACTED;
+ private Session session;
+
+ public DelegatingTransactionalMessageListener(MessageListener underlyingListener, Connection connection, Destination destination) {
+ this.underlyingListener = underlyingListener;
+
+ try {
+ session = connection.createSession(transacted, ackMode);
+ MessageConsumer consumer = session.createConsumer(destination);
+ consumer.setMessageListener(this);
+ }
+ catch (JMSException e) {
+ throw new IllegalStateException("Could not listen to " + destination, e);
+ }
+ }
+
+ public void onMessage(Message message) {
+ try {
+ underlyingListener.onMessage(message);
+ session.commit();
+ }
+ catch (Throwable e) {
+ rollback();
+ }
+ }
+
+ private void rollback() {
+ try {
+ session.rollback();
+ }
+ catch (JMSException e) {
+ log.error("Failed to rollback: " + e, e);
+ }
+ }
+
+ public Session getSession() {
+ return session;
+ }
+}
Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java?view=auto&rev=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java Wed Nov 8 00:31:39 2006
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test.rollback;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.springframework.jms.core.MessageCreator;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @version $Revision$
+ */
+public class RollbacksWhileConsumingLargeQueueTest extends
+ EmbeddedBrokerTestSupport implements MessageListener {
+
+ protected int numberOfMessagesOnQueue = 6500;
+ private Connection connection;
+ private AtomicInteger deliveryCounter = new AtomicInteger(0);
+ private AtomicInteger ackCounter = new AtomicInteger(0);
+ private CountDownLatch latch;
+ private Throwable failure;
+
+ public void testWithReciever() throws Throwable {
+ latch = new CountDownLatch(numberOfMessagesOnQueue);
+ Session session = connection.createSession(true, 0);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ long start = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - start) < 1000*1000) {
+ if (getFailure() != null) {
+ throw getFailure();
+ }
+
+ // Are we done receiving all the messages.
+ if( ackCounter.get() == numberOfMessagesOnQueue )
+ return;
+
+ Message message = consumer.receive(1000);
+ if (message == null)
+ continue;
+
+ try {
+ onMessage(message);
+ session.commit();
+ } catch (Throwable e) {
+ session.rollback();
+ }
+ }
+
+ fail("Did not receive all the messages.");
+ }
+
+ public void testWithMessageListener() throws Throwable {
+ latch = new CountDownLatch(numberOfMessagesOnQueue);
+ new DelegatingTransactionalMessageListener(this, connection,
+ destination);
+
+ long start = System.currentTimeMillis();
+ while ((System.currentTimeMillis() - start) < 1000*1000) {
+
+ if (getFailure() != null) {
+ throw getFailure();
+ }
+
+ if (latch.await(1, TimeUnit.SECONDS)) {
+ System.out.println("Received: " + deliveryCounter.get()
+ + " message(s)");
+ return;
+ }
+
+ }
+
+ fail("Did not receive all the messages.");
+ }
+
+
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ connection = createConnection();
+ connection.start();
+
+ // lets fill the queue up
+ for (int i = 0; i < numberOfMessagesOnQueue; i++) {
+ template.send(createMessageCreator(i));
+ }
+
+ }
+
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ }
+ super.tearDown();
+ }
+
+ protected MessageCreator createMessageCreator(final int i) {
+ return new MessageCreator() {
+ public Message createMessage(Session session) throws JMSException {
+ TextMessage answer = session.createTextMessage("Message: " + i);
+ answer.setIntProperty("Counter", i);
+ return answer;
+ }
+ };
+ }
+
+ public void onMessage(Message message) {
+ String msgId = null;
+ String msgText = null;
+
+ try {
+ msgId = message.getJMSMessageID();
+ msgText = ((TextMessage) message).getText();
+ } catch (JMSException e) {
+ setFailure(e);
+ }
+
+ try {
+ assertEquals("Message: " + ackCounter.get(), msgText);
+ } catch (Throwable e) {
+ setFailure(e);
+ }
+
+ int value = deliveryCounter.incrementAndGet();
+ if (value % 2 == 0) {
+ log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
+ throw new RuntimeException("Dummy exception on message: " + value);
+ }
+
+ log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
+ ackCounter.incrementAndGet();
+ latch.countDown();
+ }
+
+ public synchronized Throwable getFailure() {
+ return failure;
+ }
+
+ public synchronized void setFailure(Throwable failure) {
+ this.failure = failure;
+ }
+}
Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
------------------------------------------------------------------------------
svn:mime-type = text/plain