You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/10/25 11:03:55 UTC
camel git commit: CAMEL-9249: timer - Allow to specify a delay of -1
or something to indicate loop asap forever
Repository: camel
Updated Branches:
refs/heads/master 1957a8282 -> 0ddf4b1ca
CAMEL-9249: timer - Allow to specify a delay of -1 or something to indicate loop asap forever
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0ddf4b1c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0ddf4b1c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0ddf4b1c
Branch: refs/heads/master
Commit: 0ddf4b1ca8ca384d498f798e98fd236936f72fd4
Parents: 1957a82
Author: Andrea Cosentino <an...@gmail.com>
Authored: Sat Oct 24 14:25:48 2015 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sun Oct 25 11:01:38 2015 +0100
----------------------------------------------------------------------
.../camel/component/timer/TimerConsumer.java | 87 +++++++++++++-------
.../camel/component/timer/TimerDelayTest.java | 1 +
.../component/timer/TimerNegativeDelayTest.java | 44 ++++++++++
.../TimerNegativeNoRepeatCountDelayTest.java | 57 +++++++++++++
4 files changed, 159 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index c11506b..62261cb 100644
--- a/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.timer;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.AsyncCallback;
@@ -40,6 +41,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
private final TimerEndpoint endpoint;
private volatile TimerTask task;
private volatile boolean configured;
+ private ExecutorService executorService;
public TimerConsumer(TimerEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -53,41 +55,60 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
@Override
protected void doStart() throws Exception {
- task = new TimerTask() {
- // counter
- private final AtomicLong counter = new AtomicLong();
-
- @Override
- public void run() {
- if (!isTaskRunAllowed()) {
- // do not run timer task as it was not allowed
- LOG.debug("Run now allowed for timer: {}", endpoint);
- return;
- }
+ if (endpoint.getDelay() >= 0) {
+ task = new TimerTask() {
+ // counter
+ private final AtomicLong counter = new AtomicLong();
- try {
- long count = counter.incrementAndGet();
+ @Override
+ public void run() {
+ if (!isTaskRunAllowed()) {
+ // do not run timer task as it was not allowed
+ LOG.debug("Run now allowed for timer: {}", endpoint);
+ return;
+ }
- boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
- if (fire) {
- sendTimerExchange(count);
- } else {
- // no need to fire anymore as we exceeded repeat count
- LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
- cancel();
+ try {
+ long count = counter.incrementAndGet();
+
+ boolean fire = endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount();
+ if (fire) {
+ sendTimerExchange(count);
+ } else {
+ // no need to fire anymore as we exceeded repeat
+ // count
+ LOG.debug("Cancelling {} timer as repeat count limit reached after {} counts.", endpoint.getTimerName(), endpoint.getRepeatCount());
+ cancel();
+ }
+ } catch (Throwable e) {
+ // catch all to avoid the JVM closing the thread and not
+ // firing again
+ LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
}
- } catch (Throwable e) {
- // catch all to avoid the JVM closing the thread and not firing again
- LOG.warn("Error processing exchange. This exception will be ignored, to let the timer be able to trigger again.", e);
}
+ };
+
+ // only configure task if CamelContext already started, otherwise
+ // the StartupListener
+ // is configuring the task later
+ if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
+ Timer timer = endpoint.getTimer(this);
+ configureTask(task, timer);
}
- };
+ } else {
+ // if the delay is negative then we use an ExecutorService and fire messages as soon as possible
+ executorService = endpoint.getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, endpoint.getEndpointUri());
- // only configure task if CamelContext already started, otherwise the StartupListener
- // is configuring the task later
- if (!configured && endpoint.getCamelContext().getStatus().isStarted()) {
- Timer timer = endpoint.getTimer(this);
- configureTask(task, timer);
+ executorService.execute(new Runnable() {
+ public void run() {
+ final AtomicLong counter = new AtomicLong();
+ long count = counter.incrementAndGet();
+ while ((endpoint.getRepeatCount() <= 0 || count <= endpoint.getRepeatCount()) && isRunAllowed()) {
+ sendTimerExchange(count);
+ count = counter.incrementAndGet();
+ }
+ }
+ });
}
}
@@ -101,6 +122,12 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
// remove timer
endpoint.removeTimer(this);
+
+ // if executorService is instantiated then we shutdown it
+ if (executorService != null) {
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(executorService);
+ executorService = null;
+ }
}
@Override
@@ -108,7 +135,7 @@ public class TimerConsumer extends DefaultConsumer implements StartupListener {
if (task != null && !configured) {
Timer timer = endpoint.getTimer(this);
configureTask(task, timer);
- }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
index 6a822d8..9f534be 100644
--- a/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
+++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerDelayTest.java
@@ -38,6 +38,7 @@ public class TimerDelayTest extends ContextTestSupport {
@Override
public void configure() throws Exception {
from("timer://foo?delay=500&period=0").to("mock:result");
+
}
};
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java
new file mode 100644
index 0000000..86d4f27
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeDelayTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.timer;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class TimerNegativeDelayTest extends ContextTestSupport {
+
+ public void testDelay() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(10);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("timer://foo?delay=-1&period=0&repeatCount=10").to("mock:result");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0ddf4b1c/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java
new file mode 100644
index 0000000..be00f13
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/timer/TimerNegativeNoRepeatCountDelayTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.timer;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class TimerNegativeNoRepeatCountDelayTest extends ContextTestSupport {
+
+ public void testNegativeDelay() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ List<Exchange> exchanges = mock.getExchanges();
+
+ context.stopRoute("routeTest");
+
+ Iterator<Exchange> iter = exchanges.iterator();
+
+ while (iter.hasNext()) {
+ Exchange exchange = (Exchange) iter.next();
+ assertEquals("negativeDelay", exchange.getProperty(Exchange.TIMER_NAME));
+ assertNotNull(exchange.getProperty(Exchange.TIMER_FIRED_TIME));
+ assertNotNull(exchange.getIn().getHeader("firedTime"));
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("timer://negativeDelay?delay=-1").routeId("routeTest").to("mock:result");
+ }
+ };
+ }
+}