You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by wt...@apache.org on 2009/04/16 21:44:16 UTC
svn commit: r765729 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/StreamResequencer.java
test/java/org/apache/camel/processor/StreamResequencerTest.java
Author: wtam
Date: Thu Apr 16 19:44:16 2009
New Revision: 765729
URL: http://svn.apache.org/viewvc?rev=765729&view=rev
Log:
[CAMEL-1510] BatchProcessor interrupt has side effects (submitted on behalf of Martin Krasser)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java?rev=765729&r1=765728&r2=765729&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/StreamResequencer.java Thu Apr 16 19:44:16 2009
@@ -16,6 +16,11 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.LoggingExceptionHandler;
@@ -160,7 +165,8 @@
private class Delivery extends Thread {
- private volatile boolean cancelRequested;
+ private Lock deliveryRequestLock = new ReentrantLock();
+ private Condition deliveryRequestCondition = deliveryRequestLock.newCondition();
public Delivery() {
super("Resequencer Delivery Thread");
@@ -170,11 +176,14 @@
public void run() {
while (true) {
try {
- Thread.sleep(DELIVERY_ATTEMPT_INTERVAL);
- } catch (InterruptedException e) {
- if (cancelRequested) {
- return;
+ deliveryRequestLock.lock();
+ try {
+ deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS);
+ } finally {
+ deliveryRequestLock.unlock();
}
+ } catch (InterruptedException e) {
+ break;
}
try {
engine.deliver();
@@ -185,12 +194,16 @@
}
public void cancel() {
- cancelRequested = true;
interrupt();
}
public void request() {
- interrupt();
+ deliveryRequestLock.lock();
+ try {
+ deliveryRequestCondition.signal();
+ } finally {
+ deliveryRequestLock.unlock();
+ }
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=765729&r1=765728&r2=765729&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Thu Apr 16 19:44:16 2009
@@ -22,6 +22,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
import org.apache.camel.Route;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -54,6 +55,22 @@
resultEndpoint.assertIsSatisfied();
}
+ public void testMultithreaded() throws Exception {
+ int numMessages = 500;
+ Thread t1 = new Sender(context.createProducerTemplate(), 0, numMessages, 2);
+ Thread t2 = new Sender(context.createProducerTemplate(), 1, numMessages + 1, 2);
+ Object[] bodies = new Object[numMessages];
+ for (int i = 0; i < numMessages; i++) {
+ bodies[i] = "msg" + i;
+ }
+ resultEndpoint.expectedBodiesReceived(bodies);
+ t1.start();
+ t2.start();
+ t1.join();
+ t2.join();
+ resultEndpoint.assertIsSatisfied();
+ }
+
@Override
protected void setUp() throws Exception {
super.setUp();
@@ -111,5 +128,34 @@
assertIsInstanceOf(StreamResequencer.class, outputProcessor);
}
+
+ private static class Sender extends Thread {
+
+ ProducerTemplate template;
+
+ int start;
+ int end;
+ int increment;
+
+ public Sender(ProducerTemplate template, int start, int end, int increment) {
+ this.template = template;
+ this.start = start;
+ this.end = end;
+ this.increment = increment;
+ }
+
+ @Override
+ public void run() {
+ for (int i = start; i < end; i += increment) {
+ try {
+ Thread.sleep(2);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ template.sendBodyAndHeader("direct:start", "msg" + i, "seqnum", Long.valueOf(i));
+ }
+ }
+
+ }
}