You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/06/29 11:59:36 UTC

svn commit: r958907 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/SamplingThrottler.java test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java

Author: davsclaus
Date: Tue Jun 29 09:59:35 2010
New Revision: 958907

URL: http://svn.apache.org/viewvc?rev=958907&view=rev
Log:
CAMEL-2875: sampling DSL supports async routing engine.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java
      - copied, changed from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java?rev=958907&r1=958906&r2=958907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java Tue Jun 29 09:59:35 2010
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.commons.logging.Log;
@@ -36,9 +37,7 @@ import org.apache.commons.logging.LogFac
  *
  * @version $Revision$
  */
-public class SamplingThrottler extends DelegateProcessor {
-
-    // TODO: should support async routing engine
+public class SamplingThrottler extends DelegateAsyncProcessor {
 
     protected final transient Log log = LogFactory.getLog(getClass());
     private long samplePeriod;
@@ -72,7 +71,8 @@ public class SamplingThrottler extends D
         return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + "]";
     }
 
-    public void process(Exchange exchange) throws Exception {
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         boolean doSend = false;
 
         synchronized (calculationLock) {
@@ -91,10 +91,21 @@ public class SamplingThrottler extends D
         }
 
         if (doSend) {
-            super.process(exchange);
+            // continue routing
+            return super.process(exchange, callback);
         } else {
-            stopper.process(exchange);
+            // okay to invoke this synchronously as the stopper
+            // will just set a property
+            try {
+                stopper.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
         }
+
+        // we are done synchronously
+        callback.done(true);
+        return true;
     }
 
     private static class SampleStats {

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java (from r958893, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=958893&r2=958907&rev=958907&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointSamplingTest.java Tue Jun 29 09:59:35 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointSamplingTest extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -57,17 +57,19 @@ public class AsyncEndpointTest extends C
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("async:Bye Camel")
+                        .sample()
+                            .to("async:Bye Camel")
+                        .end()
+                        .to("log:after")
+                        .to("mock:after")
                         .process(new Processor() {
                             public void process(Exchange exchange) throws Exception {
                                 afterThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .to("log:after")
-                        .to("mock:after")
                         .to("mock:result");
             }
         };
     }
 
-}
+}
\ No newline at end of file