You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/11/08 09:31:40 UTC

svn commit: r472423 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/ test/java/org/apache/activemq/test/rollback/

Author: jstrachan
Date: Wed Nov  8 00:31:39 2006
New Revision: 472423

URL: http://svn.apache.org/viewvc?view=rev&rev=472423
Log:
backported fix for AMQ-1034 to 4.0.x branch

Added:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java   (with props)
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java   (with props)
Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Wed Nov  8 00:31:39 2006
@@ -131,7 +131,6 @@
      * @param noLocal
      * @param browser
      * @param dispatchAsync
-     * @param value
      * @throws JMSException
      */
     public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
@@ -699,7 +698,6 @@
      * Acknowledge all the messages that have been delivered to the client upto
      * this point.
      * 
-     * @param deliverySequenceId
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
@@ -740,6 +738,11 @@
             }
             if(deliveredMessages.isEmpty())
                 return;
+
+            // Only increase the redlivery delay after the first redelivery..
+            if( rollbackCounter > 0 )
+            	redeliveryDelay = redeliveryPolicy.getRedeliveryDelay(redeliveryDelay);
+
             rollbackCounter++;
             if(rollbackCounter>redeliveryPolicy.getMaximumRedeliveries()){
                 // We need to NACK the messages so that they get sent to the
@@ -755,28 +758,28 @@
             }else{
                 // stop the delivery of messages.
                 unconsumedMessages.stop();
-                // Start up the delivery again a little later.
-                if(redeliveryDelay==0){
-                    redeliveryDelay=redeliveryPolicy.getInitialRedeliveryDelay();
-                }else{
-                    if(redeliveryPolicy.isUseExponentialBackOff())
-                        redeliveryDelay*=redeliveryPolicy.getBackOffMultiplier();
-                }
-                Scheduler.executeAfterDelay(new Runnable(){
-                    public void run(){
-                        try{
-                            if(started.get())
-                                start();
-                        }catch(JMSException e){
-                            session.connection.onAsyncException(e);
-                        }
-                    }
-                },redeliveryDelay);
                 for(Iterator iter=deliveredMessages.iterator();iter.hasNext();){
                     MessageDispatch md=(MessageDispatch) iter.next();
                     md.getMessage().onMessageRolledBack();
                     unconsumedMessages.enqueueFirst(md);
                 }
+                                
+                if( redeliveryDelay > 0 ) {
+                    // Start up the delivery again a little later.
+	                Scheduler.executeAfterDelay(new Runnable(){
+	                    public void run(){
+	                        try{
+	                            if(started.get())
+	                                start();
+	                        }catch(JMSException e){
+	                            session.connection.onAsyncException(e);
+	                        }
+	                    }
+	                },redeliveryDelay);
+                } else {
+                	start();
+                }
+
             }
             deliveredCounter-=deliveredMessages.size();
             deliveredMessages.clear();
@@ -789,30 +792,33 @@
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener;
         try {
-            if (!unconsumedMessages.isClosed()) {
-                if (listener != null && unconsumedMessages.isRunning() ) {
-                    ActiveMQMessage message = createActiveMQMessage(md);
-                    beforeMessageIsConsumed(md);
-                    try {
-                        listener.onMessage(message);
-                        afterMessageIsConsumed(md, false);
-                    } catch (RuntimeException e) {
-                        if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
-                            // Redeliver the message
-                        } else {
-                            // Transacted or Client ack: Deliver the next message.
-                            afterMessageIsConsumed(md, false);
-                        }
-                    }
-                } else {
-                    unconsumedMessages.enqueue(md);
-                    if (availableListener != null) {
-                        availableListener.onMessageAvailable(this);
-                    }
-                }
+            synchronized(unconsumedMessages.getMutex()){
+	            if (!unconsumedMessages.isClosed()) {
+	                if (listener != null && unconsumedMessages.isRunning() ) {
+	                    ActiveMQMessage message = createActiveMQMessage(md);
+	                    beforeMessageIsConsumed(md);
+	                    try {
+	                        listener.onMessage(message);
+	                        afterMessageIsConsumed(md, false);
+	                    } catch (RuntimeException e) {
+	                        if ( session.isDupsOkAcknowledge() || session.isAutoAcknowledge() ) {
+	                            // Redeliver the message
+	                        } else {
+	                            // Transacted or Client ack: Deliver the next message.
+	                            afterMessageIsConsumed(md, false);
+	                        }
+	                        log.warn("Exception while processing message: " + e, e);
+	                    }
+	                } else {
+	                    unconsumedMessages.enqueue(md);
+	                    if (availableListener != null) {
+	                        availableListener.onMessageAvailable(this);
+	                    }
+	                }
+	            }
             }
         } catch (Exception e) {
-            log.warn("could not process message: " + md, e);
+        	session.connection.onAsyncException(e);
         }
     }
 
@@ -821,18 +827,12 @@
     }
 
     public void start() throws JMSException {
+    	if (unconsumedMessages.isClosed()) {
+    		return;
+    	}    	
         started.set(true);
         unconsumedMessages.start();
-        MessageListener listener = this.messageListener;
-        if( listener!=null ) {
-            MessageDispatch md;
-            while( (md = unconsumedMessages.dequeueNoWait())!=null ) {
-                ActiveMQMessage message = createActiveMQMessage(md);
-                beforeMessageIsConsumed(md);
-                listener.onMessage(message);
-                afterMessageIsConsumed(md, false);
-            }
-        }
+        session.executor.wakeup();
     }
 
     public void stop() {
@@ -843,5 +843,29 @@
     public String toString() {
         return "ActiveMQMessageConsumer { value=" +info.getConsumerId()+", started=" +started.get()+" }";
     }
+
+    /**
+     * Delivers a message to the message listener.
+     * @return
+     * @throws JMSException 
+     */
+	public boolean iterate() {
+		MessageListener listener = this.messageListener;
+		if( listener!=null ) {
+		    MessageDispatch md = unconsumedMessages.dequeueNoWait();
+		    if( md!=null ) {
+		        try {
+			        ActiveMQMessage message = createActiveMQMessage(md);
+			        beforeMessageIsConsumed(md);
+			        listener.onMessage(message);
+			        afterMessageIsConsumed(md, false);
+				} catch (JMSException e) {
+		        	session.connection.onAsyncException(e);
+				}
+		        return true;
+		    }
+		}
+    	return false;
+	}
 
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java Wed Nov  8 00:31:39 2006
@@ -61,12 +61,17 @@
         }
     }
 
-    private void wakeup() {
-        if( taskRunner!=null && !dispatchedBySessionPool && hasUncomsumedMessages() ) {
-            try {
-                taskRunner.wakeup();
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
+    public void wakeup() {
+        if( !dispatchedBySessionPool ) {
+            if( taskRunner!=null ) {
+                try {
+                    taskRunner.wakeup();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            } else {
+                while( iterate() )
+                    ;
             }
         }
     }
@@ -144,6 +149,16 @@
     }
 
     public boolean iterate() {
+
+    	// Deliver any messages queued on the consumer to their listeners.
+    	for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
+            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
+        	if( consumer.iterate() ) {
+        		return true;
+        	}
+        }
+    	
+    	// No messages left queued on the listeners.. so now dispatch messages queued on the session
         MessageDispatch message = messageQueue.dequeueNoWait();
         if( message==null ) {
             return false;

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/RedeliveryPolicy.java Wed Nov  8 00:31:39 2006
@@ -18,17 +18,24 @@
 package org.apache.activemq;
 
 import java.io.Serializable;
+import java.util.Random;
 
 /**
  * Configuration options used to control how messages are re-delivered when they
  * are rolled back.
- * 
+ *
+ * @org.apache.xbean.XBean element="redeliveryPolicy"
+ *
  * @version $Revision: 1.11 $
  */
 public class RedeliveryPolicy implements Cloneable, Serializable {
 
+    // +/-15% for a 30% spread -cgs
+    protected double collisionAvoidanceFactor = 0.15d;
     protected int maximumRedeliveries = 5;
     protected long initialRedeliveryDelay = 1000L;
+    protected static Random randomNumberGenerator;
+    protected boolean useCollisionAvoidance = false;
     protected boolean useExponentialBackOff = false;
     protected short backOffMultiplier = 5;
 
@@ -52,6 +59,14 @@
         this.backOffMultiplier = backOffMultiplier;
     }
 
+    public short getCollisionAvoidancePercent() {
+        return (short) Math.round(collisionAvoidanceFactor * 100);
+    }
+
+    public void setCollisionAvoidancePercent(short collisionAvoidancePercent) {
+        this.collisionAvoidanceFactor = collisionAvoidancePercent * 0.01d;
+    }
+
     public long getInitialRedeliveryDelay() {
         return initialRedeliveryDelay;
     }
@@ -68,6 +83,41 @@
         this.maximumRedeliveries = maximumRedeliveries;
     }
 
+    public long getRedeliveryDelay(long previousDelay) {
+        long redeliveryDelay;
+
+        if (previousDelay == 0) {
+            redeliveryDelay = initialRedeliveryDelay;
+        } else if (useExponentialBackOff && backOffMultiplier > 1) {
+            redeliveryDelay = previousDelay * backOffMultiplier;
+        } else {
+            redeliveryDelay = previousDelay;
+        }
+
+        if (useCollisionAvoidance) {
+            if (randomNumberGenerator == null) {
+                initRandomNumberGenerator();
+            }
+
+            /*
+             * First random determines +/-, second random determines how far to
+             * go in that direction. -cgs
+             */
+            double variance = (randomNumberGenerator.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * randomNumberGenerator.nextDouble();
+            redeliveryDelay += redeliveryDelay * variance;
+        }
+
+        return redeliveryDelay;
+    }
+
+    public boolean isUseCollisionAvoidance() {
+        return useCollisionAvoidance;
+    }
+
+    public void setUseCollisionAvoidance(boolean useCollisionAvoidance) {
+        this.useCollisionAvoidance = useCollisionAvoidance;
+    }
+
     public boolean isUseExponentialBackOff() {
         return useExponentialBackOff;
     }
@@ -75,4 +125,11 @@
     public void setUseExponentialBackOff(boolean useExponentialBackOff) {
         this.useExponentialBackOff = useExponentialBackOff;
     }
+
+    protected static synchronized void initRandomNumberGenerator() {
+        if (randomNumberGenerator == null) {
+            randomNumberGenerator = new Random();
+        }
+    }
+
 }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Nov  8 00:31:39 2006
@@ -121,6 +121,10 @@
                                     prefetchExtension--;
                                 }
                             }
+
+                            public void afterRollback() throws Exception {
+                            	super.afterRollback();
+                            }
                         });
                     }
                     index++;

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/MessageListenerRedeliveryTest.java Wed Nov  8 00:31:39 2006
@@ -57,7 +57,7 @@
     protected RedeliveryPolicy getRedeliveryPolicy() {
         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
         redeliveryPolicy.setInitialRedeliveryDelay(1000);
-        redeliveryPolicy.setMaximumRedeliveries(2);
+        redeliveryPolicy.setMaximumRedeliveries(3);
         redeliveryPolicy.setBackOffMultiplier((short) 2);
         redeliveryPolicy.setUseExponentialBackOff(true);
         return redeliveryPolicy;
@@ -82,7 +82,7 @@
             try {
                 log.info("Message Received: " + message);
                 counter++;
-                if (counter <= 3) {
+                if (counter <= 4) {
                     log.info("Message Rollback.");
                     session.rollback();
                 } else {
@@ -119,24 +119,26 @@
         } catch (InterruptedException e) {
 
         }
-        // first try
-        assertEquals(1, listener.counter);
+        
+        // first try.. should get 2 since there is no delay on the 
+        // first redeliver..
+        assertEquals(2, listener.counter);
 
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
 
         }
-        // second try (redelivery after 1 sec)
-        assertEquals(2, listener.counter);
+        // 2nd redeliver (redelivery after 1 sec)
+        assertEquals(3, listener.counter);
 
         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
 
         }
-        // third try (redelivery after 2 seconds) - it should give up after that
-        assertEquals(3, listener.counter);
+        // 3rd redeliver (redelivery after 2 seconds) - it should give up after that
+        assertEquals(4, listener.counter);
 
         // create new message
         producer.send(createTextMessage(session));
@@ -148,7 +150,7 @@
             // ignore
         }
         // it should be committed, so no redelivery
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         try {
             Thread.sleep(1500);
@@ -156,7 +158,7 @@
             // ignore
         }
         // no redelivery, counter should still be 4
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         session.close();
     }
@@ -184,8 +186,8 @@
         } catch (InterruptedException e) {
 
         }
-        // first try
-        assertEquals(1, listener.counter);
+        // first try 
+        assertEquals(2, listener.counter);
 
         try {
             Thread.sleep(1000);
@@ -193,7 +195,7 @@
 
         }
         // second try (redelivery after 1 sec)
-        assertEquals(2, listener.counter);
+        assertEquals(3, listener.counter);
 
         try {
             Thread.sleep(2000);
@@ -201,7 +203,7 @@
 
         }
         // third try (redelivery after 2 seconds) - it should give up after that
-        assertEquals(3, listener.counter);
+        assertEquals(4, listener.counter);
 
         // create new message
         producer.send(createTextMessage(session));
@@ -213,7 +215,7 @@
             // ignore
         }
         // it should be committed, so no redelivery
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         try {
             Thread.sleep(1500);
@@ -221,7 +223,7 @@
             // ignore
         }
         // no redelivery, counter should still be 4
-        assertEquals(4, listener.counter);
+        assertEquals(5, listener.counter);
 
         session.close();
     }

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java?view=diff&rev=472423&r1=472422&r2=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java Wed Nov  8 00:31:39 2006
@@ -70,9 +70,15 @@
         assertEquals("1st", m.getText());        
         session.rollback();
 
-        // Show re-delivery delay is incrementing.
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+        
+        // Show subsequent re-delivery delay is incrementing.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
+        
         m = (TextMessage)consumer.receive(500);
         assertNotNull(m);
         assertEquals("1st", m.getText());        
@@ -117,7 +123,12 @@
         assertEquals("1st", m.getText());        
         session.rollback();
 
-        // Show re-delivery delay is incrementing.
+        // No delay on first rollback..
+        m = (TextMessage)consumer.receive(100);
+        assertNotNull(m);
+        session.rollback();
+        
+        // Show subsequent re-delivery delay is incrementing.
         m = (TextMessage)consumer.receive(100);
         assertNull(m);
         m = (TextMessage)consumer.receive(500);

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java?view=auto&rev=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java Wed Nov  8 00:31:39 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test.rollback;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+
+public class DelegatingTransactionalMessageListener implements MessageListener {
+    private static final transient Log log = LogFactory.getLog(DelegatingTransactionalMessageListener.class);
+
+    private final MessageListener underlyingListener;
+    private boolean transacted = true;
+    //private int ackMode = Session.AUTO_ACKNOWLEDGE;
+    private int ackMode = Session.SESSION_TRANSACTED;
+    private Session session;
+
+    public DelegatingTransactionalMessageListener(MessageListener underlyingListener, Connection connection, Destination destination) {
+        this.underlyingListener = underlyingListener;
+
+        try {
+            session = connection.createSession(transacted, ackMode);
+            MessageConsumer consumer = session.createConsumer(destination);
+            consumer.setMessageListener(this);
+        }
+        catch (JMSException e) {
+            throw new IllegalStateException("Could not listen to " + destination, e);
+        }
+    }
+
+    public void onMessage(Message message) {
+        try {
+            underlyingListener.onMessage(message);
+            session.commit();
+        }
+        catch (Throwable e) {
+            rollback();
+        }
+    }
+
+    private void rollback() {
+        try {
+            session.rollback();
+        }
+        catch (JMSException e) {
+            log.error("Failed to rollback: " + e, e);
+        }
+    }
+
+    public Session getSession() {
+        return session;
+    }
+}

Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/DelegatingTransactionalMessageListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java?view=auto&rev=472423
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java (added)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java Wed Nov  8 00:31:39 2006
@@ -0,0 +1,167 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.test.rollback;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.springframework.jms.core.MessageCreator;
+
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * @version $Revision$
+ */
+public class RollbacksWhileConsumingLargeQueueTest extends
+		EmbeddedBrokerTestSupport implements MessageListener {
+
+	protected int numberOfMessagesOnQueue = 6500;
+	private Connection connection;
+	private AtomicInteger deliveryCounter = new AtomicInteger(0);
+	private AtomicInteger ackCounter = new AtomicInteger(0);
+	private CountDownLatch latch;
+	private Throwable failure;
+
+	public void testWithReciever() throws Throwable {
+		latch = new CountDownLatch(numberOfMessagesOnQueue);
+		Session session = connection.createSession(true, 0);
+		MessageConsumer consumer = session.createConsumer(destination);
+
+		long start = System.currentTimeMillis();
+		while ((System.currentTimeMillis() - start) < 1000*1000) {
+			if (getFailure() != null) {
+				throw getFailure();
+			}
+			
+			// Are we done receiving all the messages.
+			if( ackCounter.get() == numberOfMessagesOnQueue )
+				return;
+
+			Message message = consumer.receive(1000);
+			if (message == null)
+				continue;
+
+			try {
+				onMessage(message);
+				session.commit();
+			} catch (Throwable e) {
+				session.rollback();
+			}
+		}
+
+		fail("Did not receive all the messages.");
+	}
+
+	public void testWithMessageListener() throws Throwable {
+		latch = new CountDownLatch(numberOfMessagesOnQueue);
+		new DelegatingTransactionalMessageListener(this, connection,
+				destination);
+
+		long start = System.currentTimeMillis();
+		while ((System.currentTimeMillis() - start) < 1000*1000) {
+
+			if (getFailure() != null) {
+				throw getFailure();
+			}
+
+			if (latch.await(1, TimeUnit.SECONDS)) {
+				System.out.println("Received: " + deliveryCounter.get()
+						+ "  message(s)");
+				return;
+			}
+
+		}
+
+		fail("Did not receive all the messages.");
+	}
+
+
+	protected void setUp() throws Exception {
+		super.setUp();
+
+		connection = createConnection();
+		connection.start();
+
+		// lets fill the queue up
+		for (int i = 0; i < numberOfMessagesOnQueue; i++) {
+			template.send(createMessageCreator(i));
+		}
+
+	}
+
+	protected void tearDown() throws Exception {
+		if (connection != null) {
+			connection.close();
+		}
+		super.tearDown();
+	}
+
+	protected MessageCreator createMessageCreator(final int i) {
+		return new MessageCreator() {
+			public Message createMessage(Session session) throws JMSException {
+				TextMessage answer = session.createTextMessage("Message: " + i);
+				answer.setIntProperty("Counter", i);
+				return answer;
+			}
+		};
+	}
+
+	public void onMessage(Message message) {
+		String msgId = null;
+		String msgText = null;
+
+		try {
+			msgId = message.getJMSMessageID();
+			msgText = ((TextMessage) message).getText();
+		} catch (JMSException e) {
+			setFailure(e);
+		}
+
+		try {
+			assertEquals("Message: " + ackCounter.get(), msgText);
+		} catch (Throwable e) {
+			setFailure(e);
+		}
+
+		int value = deliveryCounter.incrementAndGet();
+		if (value % 2 == 0) {
+			log.info("Rolling Back message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
+			throw new RuntimeException("Dummy exception on message: " + value);
+		}
+
+		log.info("Received message: " + ackCounter.get() + " id: " + msgId + ", content: " + msgText);
+		ackCounter.incrementAndGet();
+		latch.countDown();
+	}
+
+	public synchronized Throwable getFailure() {
+		return failure;
+	}
+
+	public synchronized void setFailure(Throwable failure) {
+		this.failure = failure;
+	}
+}

Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/branches/activemq-4.0/activemq-core/src/test/java/org/apache/activemq/test/rollback/RollbacksWhileConsumingLargeQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain