You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/29 11:59:36 UTC
svn commit: r958907 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/processor/SamplingThrottler.java
test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java
Author: davsclaus
Date: Tue Jun 29 09:59:35 2010
New Revision: 958907
URL: http://svn.apache.org/viewvc?rev=958907&view=rev
Log:
CAMEL-2875: sampling DSL supports async routing engine.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java
- copied, changed from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java?rev=958907&r1=958906&r2=958907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java Tue Jun 29 09:59:35 2010
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
import java.util.concurrent.TimeUnit;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.commons.logging.Log;
@@ -36,9 +37,7 @@ import org.apache.commons.logging.LogFac
*
* @version $Revision$
*/
-public class SamplingThrottler extends DelegateProcessor {
-
- // TODO: should support async routing engine
+public class SamplingThrottler extends DelegateAsyncProcessor {
protected final transient Log log = LogFactory.getLog(getClass());
private long samplePeriod;
@@ -72,7 +71,8 @@ public class SamplingThrottler extends D
return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + "]";
}
- public void process(Exchange exchange) throws Exception {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
boolean doSend = false;
synchronized (calculationLock) {
@@ -91,10 +91,21 @@ public class SamplingThrottler extends D
}
if (doSend) {
- super.process(exchange);
+ // continue routing
+ return super.process(exchange, callback);
} else {
- stopper.process(exchange);
+ // okay to invoke this synchronously as the stopper
+ // will just set a property
+ try {
+ stopper.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
}
+
+ // we are done synchronously
+ callback.done(true);
+ return true;
}
private static class SampleStats {
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java (from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=958893&r2=958907&rev=958907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java Tue Jun 29 09:59:35 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointSamplingTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -57,17 +57,19 @@ public class AsyncEndpointTest extends C
beforeThreadName = Thread.currentThread().getName();
}
})
- .to("async:Bye Camel")
+ .sample()
+ .to("async:Bye Camel")
+ .end()
+ .to("log:after")
+ .to("mock:after")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
afterThreadName = Thread.currentThread().getName();
}
})
- .to("log:after")
- .to("mock:after")
.to("mock:result");
}
};
}
-}
+}
\ No newline at end of file