You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by wt...@apache.org on 2009/04/16 21:44:16 UTC

svn commit: r765729 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/StreamResequencer.java test/java/org/apache/camel/processor/StreamResequencerTest.java

Author: wtam
Date: Thu Apr 16 19:44:16 2009
New Revision: 765729

URL: http://svn.apache.org/viewvc?rev=765729&view=rev
Log:
[CAMEL-1510]  BatchProcessor interrupt has side effects (submitted on behalf of Martin Krasser)

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=765729&r1=765728&r2=765729&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Thu Apr 16 19:44:16 2009
@@ -16,6 +16,11 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
@@ -160,7 +165,8 @@
 
     private class Delivery extends Thread {
 
-        private volatile boolean cancelRequested;
+        private Lock deliveryRequestLock = new ReentrantLock();
+        private Condition deliveryRequestCondition = deliveryRequestLock.newCondition();
         
         public Delivery() {
             super("Resequencer Delivery Thread");
@@ -170,11 +176,14 @@
         public void run() {
             while (true) {
                 try {
-                    Thread.sleep(DELIVERY_ATTEMPT_INTERVAL);
-                } catch (InterruptedException e) {
-                    if (cancelRequested) {
-                        return;
+                    deliveryRequestLock.lock();
+                    try {
+                        deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS);
+                    } finally {
+                        deliveryRequestLock.unlock();
                     }
+                } catch (InterruptedException e) {
+                    break;
                 }
                 try {
                     engine.deliver();
@@ -185,12 +194,16 @@
         }
 
         public void cancel() {
-            cancelRequested = true;
             interrupt();
         }
         
         public void request() {
-            interrupt();
+            deliveryRequestLock.lock();
+            try {
+                deliveryRequestCondition.signal();
+            } finally {
+                deliveryRequestLock.unlock();
+            }
         }
         
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=765729&r1=765728&r2=765729&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Thu Apr 16 19:44:16 2009
@@ -22,6 +22,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
 import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -54,6 +55,22 @@
         resultEndpoint.assertIsSatisfied();
     }
 
+    public void testMultithreaded() throws Exception {
+        int numMessages = 500;
+        Thread t1 = new Sender(context.createProducerTemplate(), 0, numMessages, 2);
+        Thread t2 = new Sender(context.createProducerTemplate(), 1, numMessages + 1, 2);
+        Object[] bodies = new Object[numMessages];
+        for (int i = 0; i < numMessages; i++) {
+            bodies[i] = "msg" + i;
+        }
+        resultEndpoint.expectedBodiesReceived(bodies);
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+        resultEndpoint.assertIsSatisfied();
+    }
+    
     @Override
     protected void setUp() throws Exception {
         super.setUp();
@@ -111,5 +128,34 @@
 
         assertIsInstanceOf(StreamResequencer.class, outputProcessor);
     }
+    
+    private static class Sender extends Thread {
+        
+        ProducerTemplate template;
+
+        int start;
+        int end;
+        int increment;
+        
+        public Sender(ProducerTemplate template, int start, int end, int increment) {
+            this.template = template;
+            this.start = start;
+            this.end = end;
+            this.increment = increment;
+        }
+
+        @Override
+        public void run() {
+            for (int i = start; i < end; i += increment) {
+                try {
+                    Thread.sleep(2);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+                template.sendBodyAndHeader("direct:start", "msg" + i, "seqnum", Long.valueOf(i));
+            }
+        }
+        
+    }
 }