You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/02 14:02:04 UTC
[1/3] camel git commit: CAMEL-19596: RoutePolicy - To easily stop
routes after X messages or time
Repository: camel
Updated Branches:
refs/heads/master fd0db5611 -> 0e01cfb55
CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc3805a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc3805a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc3805a8
Branch: refs/heads/master
Commit: dc3805a8a257f41a8f04da22985209c12c917e37
Parents: ce1f04e
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 2 13:59:09 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 2 14:03:17 2017 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/main/MainSupport.java | 28 ++++++++++++++++++++
.../org/apache/camel/impl/MainSupportTest.java | 5 ++++
2 files changed, 33 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/dc3805a8/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
index 03b65d8..50b76e6 100644
--- a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
@@ -32,6 +32,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultModelJAXBContextFactory;
+import org.apache.camel.impl.DurationRoutePolicyFactory;
import org.apache.camel.impl.FileWatcherReloadStrategy;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.ModelJAXBContextFactory;
@@ -56,6 +57,7 @@ public abstract class MainSupport extends ServiceSupport {
protected final AtomicBoolean completed = new AtomicBoolean(false);
protected final AtomicInteger exitCode = new AtomicInteger(UNINITIALIZED_EXIT_CODE);
protected long duration = -1;
+ protected int durationMaxMessages;
protected TimeUnit timeUnit = TimeUnit.MILLISECONDS;
protected boolean trace;
protected List<RouteBuilder> routeBuilders = new ArrayList<RouteBuilder>();
@@ -116,6 +118,13 @@ public abstract class MainSupport extends ServiceSupport {
setDuration(Integer.parseInt(value));
}
});
+ addOption(new ParameterOption("dm", "durationMaxMessages",
+ "Sets the maximum messages duration that the application will process before terminating.",
+ "durationMaxMessages") {
+ protected void doProcess(String arg, String parameter, LinkedList<String> remainingArgs) {
+ setDurationMaxMessages(Integer.parseInt(parameter));
+ }
+ });
addOption(new Option("t", "trace", "Enables tracing") {
protected void doProcess(String arg, LinkedList<String> remainingArgs) {
enableTrace();
@@ -324,6 +333,18 @@ public abstract class MainSupport extends ServiceSupport {
this.duration = duration;
}
+ public int getDurationMaxMessages() {
+ return durationMaxMessages;
+ }
+
+ /**
+ * Sets the duration to run the application to process at most max messages until it
+ * should be terminated. Defaults to -1. Any value <= 0 will run forever.
+ */
+ public void setDurationMaxMessages(int durationMaxMessages) {
+ this.durationMaxMessages = durationMaxMessages;
+ }
+
public TimeUnit getTimeUnit() {
return timeUnit;
}
@@ -520,6 +541,13 @@ public abstract class MainSupport extends ServiceSupport {
}
}
+ if (durationMaxMessages > 0) {
+ DurationRoutePolicyFactory factory = new DurationRoutePolicyFactory();
+ factory.setMaxMessages(durationMaxMessages);
+ LOG.debug("Adding DurationRoutePolicyFactory with maxMessages: {}", durationMaxMessages);
+ camelContext.addRoutePolicyFactory(factory);
+ }
+
// try to load the route builders from the routeBuilderClasses
loadRouteBuilders(camelContext);
for (RouteBuilder routeBuilder : routeBuilders) {
http://git-wip-us.apache.org/repos/asf/camel/blob/dc3805a8/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java b/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
index 756ca42..2607c81 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
@@ -44,6 +44,11 @@ public class MainSupportTest extends ContextTestSupport {
my.run(new String[]{"-d", "1s"});
}
+ public void testMainSupportMaxMessages() throws Exception {
+ MyMainSupport my = new MyMainSupport();
+ my.run(new String[]{"-d", "1s", "-dm", "2"});
+ }
+
public void testMainSupportHelp() throws Exception {
MyMainSupport my = new MyMainSupport();
my.run(new String[]{"-h"});
[3/3] camel git commit: CAMEL-19596: RoutePolicy - To easily stop
routes after X messages or time
Posted by da...@apache.org.
CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e01cfb5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e01cfb5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e01cfb5
Branch: refs/heads/master
Commit: 0e01cfb558b6e97deddf5c515458fa26895798d0
Parents: dc3805a
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 2 15:00:07 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 2 15:00:07 2017 +0100
----------------------------------------------------------------------
.../camel/main/MainDurationEventNotifier.java | 75 ++++++++++++++++++++
.../java/org/apache/camel/main/MainSupport.java | 18 +++--
.../java/org/apache/camel/maven/RunMojo.java | 23 ++++--
3 files changed, 107 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0e01cfb5/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
new file mode 100644
index 0000000..34c8241
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.main;
+
+import java.util.EventObject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.management.event.ExchangeCompletedEvent;
+import org.apache.camel.management.event.ExchangeFailedEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.spi.EventNotifier} to trigger shutdown of the Main JVM
+ * when maximum number of messages has been processed.
+ */
+public class MainDurationEventNotifier extends EventNotifierSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MainLifecycleStrategy.class);
+ private final CamelContext camelContext;
+ private final int maxMessages;
+ private final AtomicBoolean completed;
+ private final CountDownLatch latch;
+
+ private volatile int doneMessages;
+
+ public MainDurationEventNotifier(CamelContext camelContext, int maxMessages, AtomicBoolean completed, CountDownLatch latch) {
+ this.camelContext = camelContext;
+ this.maxMessages = maxMessages;
+ this.completed = completed;
+ this.latch = latch;
+ }
+
+ @Override
+ public void notify(EventObject event) throws Exception {
+ doneMessages++;
+
+ if (maxMessages > 0 && doneMessages >= maxMessages) {
+ if (completed.compareAndSet(false, true)) {
+ LOG.info("Duration max messages triggering shutdown of the JVM.");
+ // shutting down CamelContext
+ camelContext.stop();
+ // trigger stopping the Main
+ latch.countDown();
+ }
+ }
+ }
+
+ @Override
+ public boolean isEnabled(EventObject event) {
+ return event instanceof ExchangeCompletedEvent || event instanceof ExchangeFailedEvent;
+ }
+
+ @Override
+ public String toString() {
+ return "MainDurationEventNotifier[" + maxMessages + " max messages]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0e01cfb5/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
index 50b76e6..58a9d66 100644
--- a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
@@ -35,6 +35,7 @@ import org.apache.camel.impl.DefaultModelJAXBContextFactory;
import org.apache.camel.impl.DurationRoutePolicyFactory;
import org.apache.camel.impl.FileWatcherReloadStrategy;
import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.EventNotifier;
import org.apache.camel.spi.ModelJAXBContextFactory;
import org.apache.camel.spi.ReloadStrategy;
import org.apache.camel.support.ServiceSupport;
@@ -119,7 +120,7 @@ public abstract class MainSupport extends ServiceSupport {
}
});
addOption(new ParameterOption("dm", "durationMaxMessages",
- "Sets the maximum messages duration that the application will process before terminating.",
+ "Sets the duration of maximum number of messages that the application will process before terminating.",
"durationMaxMessages") {
protected void doProcess(String arg, String parameter, LinkedList<String> remainingArgs) {
setDurationMaxMessages(Integer.parseInt(parameter));
@@ -430,6 +431,11 @@ public abstract class MainSupport extends ServiceSupport {
latch.await(duration, unit);
exitCode.compareAndSet(UNINITIALIZED_EXIT_CODE, durationHitExitCode);
completed.set(true);
+ } else if (durationMaxMessages > 0) {
+ LOG.info("Waiting until: " + durationMaxMessages + " messages has been processed");
+ exitCode.compareAndSet(UNINITIALIZED_EXIT_CODE, durationHitExitCode);
+ latch.await();
+ completed.set(true);
} else {
latch.await();
}
@@ -445,6 +451,7 @@ public abstract class MainSupport extends ServiceSupport {
public void run(String[] args) throws Exception {
parseArguments(args);
run();
+ LOG.info("MainSupport exiting code: {}", getExitCode());
}
/**
@@ -542,10 +549,11 @@ public abstract class MainSupport extends ServiceSupport {
}
if (durationMaxMessages > 0) {
- DurationRoutePolicyFactory factory = new DurationRoutePolicyFactory();
- factory.setMaxMessages(durationMaxMessages);
- LOG.debug("Adding DurationRoutePolicyFactory with maxMessages: {}", durationMaxMessages);
- camelContext.addRoutePolicyFactory(factory);
+ // register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed
+ EventNotifier notifier = new MainDurationEventNotifier(camelContext, durationMaxMessages, completed, latch);
+ // register our event notifier
+ ServiceHelper.startService(notifier);
+ camelContext.getManagementStrategy().addEventNotifier(notifier);
}
// try to load the route builders from the routeBuilderClasses
http://git-wip-us.apache.org/repos/asf/camel/blob/0e01cfb5/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
----------------------------------------------------------------------
diff --git a/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java b/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
index f7170b6..1f132e3 100644
--- a/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
+++ b/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
@@ -95,6 +95,15 @@ public class RunMojo extends AbstractExecMojo {
protected String duration;
/**
+ * Sets the duration of maximum number of messages that the application will process before terminating.
+ *
+ * @parameter property="camel.duration.maxMessages"
+ * default-value="-1"
+ *
+ */
+ protected String durationMaxMessages;
+
+ /**
* Whether to log the classpath when starting
*
* @parameter property="camel.logClasspath"
@@ -288,7 +297,7 @@ public class RunMojo extends AbstractExecMojo {
private ExecutableDependency executableDependency;
/**
- * Wether to interrupt/join and possibly stop the daemon threads upon
+ * Whether to interrupt/join and possibly stop the daemon threads upon
* quitting. <br/> If this is <code>false</code>, maven does nothing
* about the daemon threads. When maven has no more work to do, the VM will
* normally terminate any remaining daemon threads.
@@ -424,9 +433,15 @@ public class RunMojo extends AbstractExecMojo {
args.add(basedPackages);
usingSpringJavaConfigureMain = true;
}
-
- args.add("-d");
- args.add(duration);
+
+ if (!duration.equals("-1")) {
+ args.add("-d");
+ args.add(duration);
+ }
+ if (!durationMaxMessages.equals("-1")) {
+ args.add("-dm");
+ args.add(durationMaxMessages);
+ }
if (arguments != null) {
args.addAll(Arrays.asList(arguments));
}
[2/3] camel git commit: CAMEL-19596: RoutePolicy - To easily stop
routes after X messages or time
Posted by da...@apache.org.
CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce1f04e2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce1f04e2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce1f04e2
Branch: refs/heads/master
Commit: ce1f04e2dd3c49277794bbfd92b21738b678da40
Parents: fd0db56
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 2 13:52:45 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 2 14:03:17 2017 +0100
----------------------------------------------------------------------
.../apache/camel/impl/DurationRoutePolicy.java | 188 +++++++++++++++++++
.../camel/impl/DurationRoutePolicyFactory.java | 93 +++++++++
.../impl/DurationRoutePolicyFactoryTest.java | 66 +++++++
.../DurationRoutePolicyMaxMessagesTest.java | 63 +++++++
.../impl/DurationRoutePolicyMaxSecondsTest.java | 63 +++++++
5 files changed, 473 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java
new file mode 100644
index 0000000..4881b8d
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * {@link org.apache.camel.spi.RoutePolicy} which executes for a duration and then triggers an action.
+ * <p/>
+ * This can be used to stop the route after it has processed a number of messages, or has been running for N seconds.
+ */
+public class DurationRoutePolicy extends org.apache.camel.support.RoutePolicySupport implements CamelContextAware {
+
+ enum Action {
+ STOP_CAMEL_CONTEXT, STOP_ROUTE, SUSPEND_ROUTE, SUSPEND_ALL_ROUTES
+ }
+
+ private CamelContext camelContext;
+ private String routeId;
+ private ScheduledExecutorService executorService;
+ private volatile ScheduledFuture task;
+ private volatile int doneMessages;
+ private AtomicBoolean actionDone = new AtomicBoolean();
+
+ private Action action = Action.STOP_ROUTE;
+ private int maxMessages;
+ private int maxSeconds;
+
+ public DurationRoutePolicy() {
+ }
+
+ public DurationRoutePolicy(CamelContext camelContext, String routeId) {
+ this.camelContext = camelContext;
+ this.routeId = routeId;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ public int getMaxMessages() {
+ return maxMessages;
+ }
+
+ /**
+ * Maximum number of messages to process before the action is triggered
+ */
+ public void setMaxMessages(int maxMessages) {
+ this.maxMessages = maxMessages;
+ }
+
+ public int getMaxSeconds() {
+ return maxSeconds;
+ }
+
+ /**
+ * Maximum seconds Camel is running before the action is triggered
+ */
+ public void setMaxSeconds(int maxSeconds) {
+ this.maxSeconds = maxSeconds;
+ }
+
+ public Action getAction() {
+ return action;
+ }
+
+ /**
+ * What action to perform when maximum is triggered.
+ */
+ public void setAction(Action action) {
+ this.action = action;
+ }
+
+ @Override
+ public void onInit(Route route) {
+ super.onInit(route);
+
+ ObjectHelper.notNull(camelContext, "camelContext", this);
+
+ if (maxMessages == 0 && maxSeconds == 0) {
+ throw new IllegalArgumentException("The options maxMessages or maxSeconds must be configured");
+ }
+
+ if (routeId == null) {
+ this.routeId = route.getId();
+ }
+
+ if (executorService == null) {
+ executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "DurationRoutePolicy[" + routeId + "]");
+ }
+
+ if (maxSeconds > 0) {
+ task = performMaxDurationAction();
+ }
+ }
+
+ @Override
+ public void onExchangeDone(Route route, Exchange exchange) {
+ doneMessages++;
+
+ if (maxMessages > 0 && doneMessages >= maxMessages) {
+ if (actionDone.compareAndSet(false, true)) {
+ performMaxMessagesAction();
+ if (task != null && !task.isDone()) {
+ task.cancel(false);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (task != null && !task.isDone()) {
+ task.cancel(false);
+ }
+
+ if (executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+ executorService = null;
+ }
+ }
+
+ protected void performMaxMessagesAction() {
+ executorService.submit(createTask(true));
+ }
+
+ protected ScheduledFuture performMaxDurationAction() {
+ return executorService.schedule(createTask(false), maxSeconds, TimeUnit.SECONDS);
+ }
+
+ private Runnable createTask(boolean maxMessagesHit) {
+ return () -> {
+ try {
+ String tail;
+ if (maxMessagesHit) {
+ tail = " due max messages " + getMaxMessages() + " processed";
+ } else {
+ tail = " due max seconds " + getMaxSeconds();
+ }
+
+ if (action == Action.STOP_CAMEL_CONTEXT) {
+ log.info("Stopping CamelContext {}", tail);
+ camelContext.stop();
+ } else if (action == Action.STOP_ROUTE) {
+ log.info("Stopping route: {}{}", routeId, tail);
+ camelContext.stopRoute(routeId);
+ } else if (action == Action.SUSPEND_ROUTE) {
+ log.info("Suspending route: {}{}", routeId, tail);
+ camelContext.suspendRoute(routeId);
+ } else if (action == Action.SUSPEND_ALL_ROUTES) {
+ log.info("Suspending all routes {}", tail);
+ camelContext.suspend();
+ }
+ } catch (Throwable e) {
+ log.warn("Error performing action: " + action, e);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java
new file mode 100644
index 0000000..849e00c
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.util.EndpointHelper;
+
+/**
+ * {@link org.apache.camel.spi.RoutePolicyFactory} which executes for a duration and then triggers an action.
+ * <p/>
+ * This can be used to stop a set of routes (or CamelContext) after it has processed a number of messages, or has been running for N seconds.
+ */
+public class DurationRoutePolicyFactory implements RoutePolicyFactory {
+
+ private String fromRouteId;
+ private int maxMessages;
+ private int maxSeconds;
+ private DurationRoutePolicy.Action action = DurationRoutePolicy.Action.STOP_ROUTE;
+
+ @Override
+ public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, RouteDefinition route) {
+ DurationRoutePolicy policy = null;
+
+ if (fromRouteId == null || EndpointHelper.matchPattern(routeId, fromRouteId)) {
+ policy = new DurationRoutePolicy(camelContext, routeId);
+ policy.setMaxMessages(maxMessages);
+ policy.setMaxSeconds(maxSeconds);
+ policy.setAction(action);
+ }
+
+ return policy;
+ }
+
+ public String getFromRouteId() {
+ return fromRouteId;
+ }
+
+ /**
+ * Limit the route policy to the route which matches this pattern
+ *
+ * @see EndpointHelper#matchPattern(String, String)
+ */
+ public void setFromRouteId(String fromRouteId) {
+ this.fromRouteId = fromRouteId;
+ }
+
+ /**
+ * Maximum number of messages to process before the action is triggered
+ */
+ public void setMaxMessages(int maxMessages) {
+ this.maxMessages = maxMessages;
+ }
+
+ public int getMaxSeconds() {
+ return maxSeconds;
+ }
+
+ /**
+ * Maximum seconds Camel is running before the action is triggered
+ */
+ public void setMaxSeconds(int maxSeconds) {
+ this.maxSeconds = maxSeconds;
+ }
+
+ public DurationRoutePolicy.Action getAction() {
+ return action;
+ }
+
+ /**
+ * What action to perform when maximum is triggered.
+ */
+ public void setAction(DurationRoutePolicy.Action action) {
+ this.action = action;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..3c15f9e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DurationRoutePolicyFactoryTest extends ContextTestSupport {
+
+ public void testDurationRoutePolicyFactory() throws Exception {
+ assertTrue(context.getRouteStatus("foo").isStarted());
+ assertFalse(context.getRouteStatus("foo").isStopped());
+
+ // the policy should stop the route after 2 seconds which is approx 20-30 messages
+ getMockEndpoint("mock:foo").expectedMinimumMessageCount(10);
+ assertMockEndpointsSatisfied();
+
+ Exception cause = null;
+
+ // need a little time to stop async
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ try {
+ assertFalse(context.getRouteStatus("foo").isStarted());
+ assertTrue(context.getRouteStatus("foo").isStopped());
+ } catch (Exception e) {
+ cause = e;
+ }
+ }
+
+ if (cause != null) {
+ throw cause;
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ DurationRoutePolicyFactory factory = new DurationRoutePolicyFactory();
+ factory.setMaxSeconds(2);
+ factory.setMaxMessages(25);
+
+ getContext().addRoutePolicyFactory(factory);
+
+ from("timer:foo?period=100").routeId("foo")
+ .to("mock:foo");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java
new file mode 100644
index 0000000..c7c873f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DurationRoutePolicyMaxMessagesTest extends ContextTestSupport {
+
+ public void testDurationRoutePolicy() throws Exception {
+ assertTrue(context.getRouteStatus("foo").isStarted());
+ assertFalse(context.getRouteStatus("foo").isStopped());
+
+ // the policy should stop the route after 5 messages
+ getMockEndpoint("mock:foo").expectedMinimumMessageCount(5);
+ assertMockEndpointsSatisfied();
+
+ Exception cause = null;
+
+ // need a little time to stop async
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ try {
+ assertFalse(context.getRouteStatus("foo").isStarted());
+ assertTrue(context.getRouteStatus("foo").isStopped());
+ } catch (Exception e) {
+ cause = e;
+ }
+ }
+
+ if (cause != null) {
+ throw cause;
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ DurationRoutePolicy policy = new DurationRoutePolicy();
+ policy.setMaxMessages(5);
+
+ from("timer:foo?period=100").routeId("foo").routePolicy(policy)
+ .to("mock:foo");
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java
new file mode 100644
index 0000000..e0b0163
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DurationRoutePolicyMaxSecondsTest extends ContextTestSupport {
+
+ public void testDurationRoutePolicy() throws Exception {
+ assertTrue(context.getRouteStatus("foo").isStarted());
+ assertFalse(context.getRouteStatus("foo").isStopped());
+
+ // the policy should stop the route after 2 seconds which is approx 20-30 messages
+ getMockEndpoint("mock:foo").expectedMinimumMessageCount(10);
+ assertMockEndpointsSatisfied();
+
+ Exception cause = null;
+
+ // need a little time to stop async
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(100);
+ try {
+ assertFalse(context.getRouteStatus("foo").isStarted());
+ assertTrue(context.getRouteStatus("foo").isStopped());
+ } catch (Exception e) {
+ cause = e;
+ }
+ }
+
+ if (cause != null) {
+ throw cause;
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ DurationRoutePolicy policy = new DurationRoutePolicy();
+ policy.setMaxSeconds(2);
+
+ from("timer:foo?period=100").routeId("foo").routePolicy(policy)
+ .to("mock:foo");
+ }
+ };
+ }
+}