You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/10/25 11:03:55 UTC

camel git commit: CAMEL-9249: timer - Allow to specify a delay of -1 or something to indicate loop asap forever

Repository: camel
Updated Branches:
  refs/heads/master 1957a8282 -> 0ddf4b1ca


CAMEL-9249: timer - Allow to specify a delay of -1 or something to indicate loop asap forever


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ddf4b1c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ddf4b1c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ddf4b1c

Branch: refs/heads/master
Commit: 0ddf4b1ca8ca384d498f798e98fd236936f72fd4
Parents: 1957a82
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sat Oct 24 14:25:48 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sun Oct 25 11:01:38 2015 +0100

----------------------------------------------------------------------
 .../camel/component/timer/TimerConsumer.java    | 87 +++++++++++++-------
 .../camel/component/timer/TimerDelayTest.java   |  1 +
 .../component/timer/TimerNegativeDelayTest.java | 44 ++++++++++
 .../TimerNegativeNoRepeatCountDelayTest.java    | 57 +++++++++++++
 4 files changed, 159 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index c11506b..62261cb 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.timer;
 import java.util.Date;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.AsyncCallback;
@@ -40,6 +41,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
     private final TimerEndpoint endpoint;
     private volatile TimerTask task;
     private volatile boolean configured;
+    private ExecutorService executorService;
 
     public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -53,41 +55,60 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
 
     @Override
     protected void doStart() throws Exception {
-        task = new TimerTask() {
-            // counter
-            private final AtomicLong counter = new AtomicLong();
-
-            @Override
-            public void run() {
-                if (!isTaskRunAllowed()) {
-                    // do not run timer task as it was not allowed
-                    LOG.debug("Run now allowed for timer: {}", endpoint);
-                    return;
-                }
+        if (endpoint.getDelay() >= 0) { 
+            task = new TimerTask() {
+                // counter
+                private final AtomicLong counter = new AtomicLong();
 
-                try {
-                    long count = counter.incrementAndGet();
+                @Override
+                public void run() {
+                    if (!isTaskRunAllowed()) {
+                        // do not run timer task as it was not allowed
+                        LOG.debug("Run now allowed for timer: {}", endpoint);
+                        return;
+                    }
 
-                    boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
-                    if (fire) {
-                        sendTimerExchange(count);
-                    } else {
-                        // no need to fire anymore as we exceeded repeat count
-                        LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
-                        cancel();
+                    try {
+                        long count = counter.incrementAndGet();
+
+                        boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
+                        if (fire) {
+                            sendTimerExchange(count);
+                        } else {
+                            // no need to fire anymore as we exceeded repeat
+                            // count
+                            LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
+                            cancel();
+                        }
+                    } catch (Throwable e) {
+                        // catch all to avoid the JVM closing the thread and not
+                        // firing again
+                        LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
                     }
-                } catch (Throwable e) {
-                    // catch all to avoid the JVM closing the thread and not firing again
-                    LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
                 }
+            };
+
+            // only configure task if CamelContext already started, otherwise
+            // the StartupListener
+            // is configuring the task later
+            if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
+                Timer timer = endpoint.getTimer(this);
+                configureTask(task, timer);
             }
-        };
+        } else {
+            // if the delay is negative then we use an ExecutorService and fire messages as soon as possible
+            executorService = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, endpoint.getEndpointUri());
 
-        // only configure task if CamelContext already started, otherwise the StartupListener
-        // is configuring the task later
-        if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
-            Timer timer = endpoint.getTimer(this);
-            configureTask(task, timer);
+            executorService.execute(new Runnable() {
+                public void run() {
+                    final AtomicLong counter = new AtomicLong();
+                    long count = counter.incrementAndGet();
+                    while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) {
+                        sendTimerExchange(count);
+                        count = counter.incrementAndGet();
+                    }
+                }
+            });
         }
     }
 
@@ -101,6 +122,12 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
 
         // remove timer
         endpoint.removeTimer(this);
+        
+        // if executorService is instantiated then we shutdown it
+        if (executorService != null) {
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(executorService);
+            executorService = null;
+        }
     }
 
     @Override
@@ -108,7 +135,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
         if (task != null && !configured) {
             Timer timer = endpoint.getTimer(this);
             configureTask(task, timer);
-        }
+        } 
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
index 6a822d8..9f534be 100644
--- a/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
@@ -38,6 +38,7 @@ public class TimerDelayTest extends ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("timer://foo?delay=500&period=0").to("mock:result");
+                
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java
new file mode 100644
index 0000000..86d4f27
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.timer;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class TimerNegativeDelayTest extends ContextTestSupport {
+
+    public void testDelay() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(10);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer://foo?delay=-1&period=0&repeatCount=10").to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java
new file mode 100644
index 0000000..be00f13
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.timer;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class TimerNegativeNoRepeatCountDelayTest extends ContextTestSupport {
+
+    public void testNegativeDelay() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        List<Exchange> exchanges = mock.getExchanges();
+        
+        context.stopRoute("routeTest");
+        
+        Iterator<Exchange> iter = exchanges.iterator();
+        
+        while (iter.hasNext()) {
+            Exchange exchange = (Exchange) iter.next();
+            assertEquals("negativeDelay", exchange.getProperty(Exchange.TIMER_NAME));
+            assertNotNull(exchange.getProperty(Exchange.TIMER_FIRED_TIME));
+            assertNotNull(exchange.getIn().getHeader("firedTime"));
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("timer://negativeDelay?delay=-1").routeId("routeTest").to("mock:result");
+            }
+        };
+    }
+}