You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/02 14:02:04 UTC

[1/3] camel git commit: CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time

Repository: camel
Updated Branches:
  refs/heads/master fd0db5611 -> 0e01cfb55


CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/dc3805a8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/dc3805a8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/dc3805a8

Branch: refs/heads/master
Commit: dc3805a8a257f41a8f04da22985209c12c917e37
Parents: ce1f04e
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 2 13:59:09 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 2 14:03:17 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/main/MainSupport.java | 28 ++++++++++++++++++++
 .../org/apache/camel/impl/MainSupportTest.java  |  5 ++++
 2 files changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/dc3805a8/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
index 03b65d8..50b76e6 100644
--- a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
@@ -32,6 +32,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultModelJAXBContextFactory;
+import org.apache.camel.impl.DurationRoutePolicyFactory;
 import org.apache.camel.impl.FileWatcherReloadStrategy;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.spi.ModelJAXBContextFactory;
@@ -56,6 +57,7 @@ public abstract class MainSupport extends ServiceSupport {
     protected final AtomicBoolean completed = new AtomicBoolean(false);
     protected final AtomicInteger exitCode = new AtomicInteger(UNINITIALIZED_EXIT_CODE);
     protected long duration = -1;
+    protected int durationMaxMessages;
     protected TimeUnit timeUnit = TimeUnit.MILLISECONDS;
     protected boolean trace;
     protected List<RouteBuilder> routeBuilders = new ArrayList<RouteBuilder>();
@@ -116,6 +118,13 @@ public abstract class MainSupport extends ServiceSupport {
                 setDuration(Integer.parseInt(value));
             }
         });
+        addOption(new ParameterOption("dm", "durationMaxMessages",
+                "Sets the maximum messages duration that the application will process before terminating.",
+                "durationMaxMessages") {
+            protected void doProcess(String arg, String parameter, LinkedList<String> remainingArgs) {
+                setDurationMaxMessages(Integer.parseInt(parameter));
+            }
+        });
         addOption(new Option("t", "trace", "Enables tracing") {
             protected void doProcess(String arg, LinkedList<String> remainingArgs) {
                 enableTrace();
@@ -324,6 +333,18 @@ public abstract class MainSupport extends ServiceSupport {
         this.duration = duration;
     }
 
+    public int getDurationMaxMessages() {
+        return durationMaxMessages;
+    }
+
+    /**
+     * Sets the duration to run the application to process at most max messages until it
+     * should be terminated. Defaults to -1. Any value <= 0 will run forever.
+     */
+    public void setDurationMaxMessages(int durationMaxMessages) {
+        this.durationMaxMessages = durationMaxMessages;
+    }
+
     public TimeUnit getTimeUnit() {
         return timeUnit;
     }
@@ -520,6 +541,13 @@ public abstract class MainSupport extends ServiceSupport {
             }
         }
 
+        if (durationMaxMessages > 0) {
+            DurationRoutePolicyFactory factory = new DurationRoutePolicyFactory();
+            factory.setMaxMessages(durationMaxMessages);
+            LOG.debug("Adding DurationRoutePolicyFactory with maxMessages: {}", durationMaxMessages);
+            camelContext.addRoutePolicyFactory(factory);
+        }
+
         // try to load the route builders from the routeBuilderClasses
         loadRouteBuilders(camelContext);
         for (RouteBuilder routeBuilder : routeBuilders) {

http://git-wip-us.apache.org/repos/asf/camel/blob/dc3805a8/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java b/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
index 756ca42..2607c81 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MainSupportTest.java
@@ -44,6 +44,11 @@ public class MainSupportTest extends ContextTestSupport {
         my.run(new String[]{"-d", "1s"});
     }
 
+    public void testMainSupportMaxMessages() throws Exception {
+        MyMainSupport my = new MyMainSupport();
+        my.run(new String[]{"-d", "1s", "-dm", "2"});
+    }
+
     public void testMainSupportHelp() throws Exception {
         MyMainSupport my = new MyMainSupport();
         my.run(new String[]{"-h"});


[3/3] camel git commit: CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time

Posted by da...@apache.org.
CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0e01cfb5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0e01cfb5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0e01cfb5

Branch: refs/heads/master
Commit: 0e01cfb558b6e97deddf5c515458fa26895798d0
Parents: dc3805a
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 2 15:00:07 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 2 15:00:07 2017 +0100

----------------------------------------------------------------------
 .../camel/main/MainDurationEventNotifier.java   | 75 ++++++++++++++++++++
 .../java/org/apache/camel/main/MainSupport.java | 18 +++--
 .../java/org/apache/camel/maven/RunMojo.java    | 23 ++++--
 3 files changed, 107 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0e01cfb5/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
new file mode 100644
index 0000000..34c8241
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.main;
+
+import java.util.EventObject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.management.event.ExchangeCompletedEvent;
+import org.apache.camel.management.event.ExchangeFailedEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link org.apache.camel.spi.EventNotifier} to trigger shutdown of the Main JVM
+ * when maximum number of messages has been processed.
+ */
+public class MainDurationEventNotifier extends EventNotifierSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MainLifecycleStrategy.class);
+    private final CamelContext camelContext;
+    private final int maxMessages;
+    private final AtomicBoolean completed;
+    private final CountDownLatch latch;
+
+    private volatile int doneMessages;
+
+    public MainDurationEventNotifier(CamelContext camelContext, int maxMessages, AtomicBoolean completed, CountDownLatch latch) {
+        this.camelContext = camelContext;
+        this.maxMessages = maxMessages;
+        this.completed = completed;
+        this.latch = latch;
+    }
+
+    @Override
+    public void notify(EventObject event) throws Exception {
+        doneMessages++;
+
+        if (maxMessages > 0 && doneMessages >= maxMessages) {
+            if (completed.compareAndSet(false, true)) {
+                LOG.info("Duration max messages triggering shutdown of the JVM.");
+                // shutting down CamelContext
+                camelContext.stop();
+                // trigger stopping the Main
+                latch.countDown();
+            }
+        }
+    }
+
+    @Override
+    public boolean isEnabled(EventObject event) {
+        return event instanceof ExchangeCompletedEvent || event instanceof ExchangeFailedEvent;
+    }
+
+    @Override
+    public String toString() {
+        return "MainDurationEventNotifier[" + maxMessages + " max messages]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/0e01cfb5/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
index 50b76e6..58a9d66 100644
--- a/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/main/MainSupport.java
@@ -35,6 +35,7 @@ import org.apache.camel.impl.DefaultModelJAXBContextFactory;
 import org.apache.camel.impl.DurationRoutePolicyFactory;
 import org.apache.camel.impl.FileWatcherReloadStrategy;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.ModelJAXBContextFactory;
 import org.apache.camel.spi.ReloadStrategy;
 import org.apache.camel.support.ServiceSupport;
@@ -119,7 +120,7 @@ public abstract class MainSupport extends ServiceSupport {
             }
         });
         addOption(new ParameterOption("dm", "durationMaxMessages",
-                "Sets the maximum messages duration that the application will process before terminating.",
+                "Sets the duration of maximum number of messages that the application will process before terminating.",
                 "durationMaxMessages") {
             protected void doProcess(String arg, String parameter, LinkedList<String> remainingArgs) {
                 setDurationMaxMessages(Integer.parseInt(parameter));
@@ -430,6 +431,11 @@ public abstract class MainSupport extends ServiceSupport {
                     latch.await(duration, unit);
                     exitCode.compareAndSet(UNINITIALIZED_EXIT_CODE, durationHitExitCode);
                     completed.set(true);
+                } else if (durationMaxMessages > 0) {
+                    LOG.info("Waiting until: " + durationMaxMessages + " messages has been processed");
+                    exitCode.compareAndSet(UNINITIALIZED_EXIT_CODE, durationHitExitCode);
+                    latch.await();
+                    completed.set(true);
                 } else {
                     latch.await();
                 }
@@ -445,6 +451,7 @@ public abstract class MainSupport extends ServiceSupport {
     public void run(String[] args) throws Exception {
         parseArguments(args);
         run();
+        LOG.info("MainSupport exiting code: {}", getExitCode());
     }
 
     /**
@@ -542,10 +549,11 @@ public abstract class MainSupport extends ServiceSupport {
         }
 
         if (durationMaxMessages > 0) {
-            DurationRoutePolicyFactory factory = new DurationRoutePolicyFactory();
-            factory.setMaxMessages(durationMaxMessages);
-            LOG.debug("Adding DurationRoutePolicyFactory with maxMessages: {}", durationMaxMessages);
-            camelContext.addRoutePolicyFactory(factory);
+            // register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed
+            EventNotifier notifier = new MainDurationEventNotifier(camelContext, durationMaxMessages, completed, latch);
+            // register our event notifier
+            ServiceHelper.startService(notifier);
+            camelContext.getManagementStrategy().addEventNotifier(notifier);
         }
 
         // try to load the route builders from the routeBuilderClasses

http://git-wip-us.apache.org/repos/asf/camel/blob/0e01cfb5/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
----------------------------------------------------------------------
diff --git a/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java b/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
index f7170b6..1f132e3 100644
--- a/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
+++ b/tooling/maven/camel-maven-plugin/src/main/java/org/apache/camel/maven/RunMojo.java
@@ -95,6 +95,15 @@ public class RunMojo extends AbstractExecMojo {
     protected String duration;
 
     /**
+     * Sets the duration of maximum number of messages that the application will process before terminating.
+     *
+     * @parameter property="camel.duration.maxMessages"
+     *            default-value="-1"
+     *
+     */
+    protected String durationMaxMessages;
+
+    /**
      * Whether to log the classpath when starting
      *
      * @parameter property="camel.logClasspath"
@@ -288,7 +297,7 @@ public class RunMojo extends AbstractExecMojo {
     private ExecutableDependency executableDependency;
 
     /**
-     * Wether to interrupt/join and possibly stop the daemon threads upon
+     * Whether to interrupt/join and possibly stop the daemon threads upon
      * quitting. <br/> If this is <code>false</code>, maven does nothing
      * about the daemon threads. When maven has no more work to do, the VM will
      * normally terminate any remaining daemon threads.
@@ -424,9 +433,15 @@ public class RunMojo extends AbstractExecMojo {
             args.add(basedPackages);
             usingSpringJavaConfigureMain = true;
         }
- 
-        args.add("-d");
-        args.add(duration);
+
+        if (!duration.equals("-1")) {
+            args.add("-d");
+            args.add(duration);
+        }
+        if (!durationMaxMessages.equals("-1")) {
+            args.add("-dm");
+            args.add(durationMaxMessages);
+        }
         if (arguments != null) {
             args.addAll(Arrays.asList(arguments));
         }


[2/3] camel git commit: CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time

Posted by da...@apache.org.
CAMEL-19596: RoutePolicy - To easily stop routes after X messages or time


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce1f04e2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce1f04e2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce1f04e2

Branch: refs/heads/master
Commit: ce1f04e2dd3c49277794bbfd92b21738b678da40
Parents: fd0db56
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Mar 2 13:52:45 2017 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Mar 2 14:03:17 2017 +0100

----------------------------------------------------------------------
 .../apache/camel/impl/DurationRoutePolicy.java  | 188 +++++++++++++++++++
 .../camel/impl/DurationRoutePolicyFactory.java  |  93 +++++++++
 .../impl/DurationRoutePolicyFactoryTest.java    |  66 +++++++
 .../DurationRoutePolicyMaxMessagesTest.java     |  63 +++++++
 .../impl/DurationRoutePolicyMaxSecondsTest.java |  63 +++++++
 5 files changed, 473 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java
new file mode 100644
index 0000000..4881b8d
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicy.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * {@link org.apache.camel.spi.RoutePolicy} which executes for a duration and then triggers an action.
+ * <p/>
+ * This can be used to stop the route after it has processed a number of messages, or has been running for N seconds.
+ */
+public class DurationRoutePolicy extends org.apache.camel.support.RoutePolicySupport implements CamelContextAware {
+
+    enum Action {
+        STOP_CAMEL_CONTEXT, STOP_ROUTE, SUSPEND_ROUTE, SUSPEND_ALL_ROUTES
+    }
+
+    private CamelContext camelContext;
+    private String routeId;
+    private ScheduledExecutorService executorService;
+    private volatile ScheduledFuture task;
+    private volatile int doneMessages;
+    private AtomicBoolean actionDone = new AtomicBoolean();
+
+    private Action action = Action.STOP_ROUTE;
+    private int maxMessages;
+    private int maxSeconds;
+
+    public DurationRoutePolicy() {
+    }
+
+    public DurationRoutePolicy(CamelContext camelContext, String routeId) {
+        this.camelContext = camelContext;
+        this.routeId = routeId;
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    public int getMaxMessages() {
+        return maxMessages;
+    }
+
+    /**
+     * Maximum number of messages to process before the action is triggered
+     */
+    public void setMaxMessages(int maxMessages) {
+        this.maxMessages = maxMessages;
+    }
+
+    public int getMaxSeconds() {
+        return maxSeconds;
+    }
+
+    /**
+     * Maximum seconds Camel is running before the action is triggered
+     */
+    public void setMaxSeconds(int maxSeconds) {
+        this.maxSeconds = maxSeconds;
+    }
+
+    public Action getAction() {
+        return action;
+    }
+
+    /**
+     * What action to perform when maximum is triggered.
+     */
+    public void setAction(Action action) {
+        this.action = action;
+    }
+
+    @Override
+    public void onInit(Route route) {
+        super.onInit(route);
+
+        ObjectHelper.notNull(camelContext, "camelContext", this);
+
+        if (maxMessages == 0 && maxSeconds == 0) {
+            throw new IllegalArgumentException("The options maxMessages or maxSeconds must be configured");
+        }
+
+        if (routeId == null) {
+            this.routeId = route.getId();
+        }
+
+        if (executorService == null) {
+            executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "DurationRoutePolicy[" + routeId + "]");
+        }
+
+        if (maxSeconds > 0) {
+            task = performMaxDurationAction();
+        }
+    }
+
+    @Override
+    public void onExchangeDone(Route route, Exchange exchange) {
+        doneMessages++;
+
+        if (maxMessages > 0 && doneMessages >= maxMessages) {
+            if (actionDone.compareAndSet(false, true)) {
+                performMaxMessagesAction();
+                if (task != null && !task.isDone()) {
+                    task.cancel(false);
+                }
+            }
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (task != null && !task.isDone()) {
+            task.cancel(false);
+        }
+
+        if (executorService != null) {
+            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
+    }
+
+    protected void performMaxMessagesAction() {
+        executorService.submit(createTask(true));
+    }
+
+    protected ScheduledFuture performMaxDurationAction() {
+        return executorService.schedule(createTask(false), maxSeconds, TimeUnit.SECONDS);
+    }
+
+    private Runnable createTask(boolean maxMessagesHit) {
+        return () -> {
+            try {
+                String tail;
+                if (maxMessagesHit) {
+                    tail = " due max messages " + getMaxMessages() + " processed";
+                } else {
+                    tail = " due max seconds " + getMaxSeconds();
+                }
+
+                if (action == Action.STOP_CAMEL_CONTEXT) {
+                    log.info("Stopping CamelContext {}", tail);
+                    camelContext.stop();
+                } else if (action == Action.STOP_ROUTE) {
+                    log.info("Stopping route: {}{}", routeId, tail);
+                    camelContext.stopRoute(routeId);
+                } else if (action == Action.SUSPEND_ROUTE) {
+                    log.info("Suspending route: {}{}", routeId, tail);
+                    camelContext.suspendRoute(routeId);
+                } else if (action == Action.SUSPEND_ALL_ROUTES) {
+                    log.info("Suspending all routes {}", tail);
+                    camelContext.suspend();
+                }
+            } catch (Throwable e) {
+                log.warn("Error performing action: " + action, e);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java
new file mode 100644
index 0000000..849e00c
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DurationRoutePolicyFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.util.EndpointHelper;
+
+/**
+ * {@link org.apache.camel.spi.RoutePolicyFactory} which executes for a duration and then triggers an action.
+ * <p/>
+ * This can be used to stop a set of routes (or CamelContext) after it has processed a number of messages, or has been running for N seconds.
+ */
+public class DurationRoutePolicyFactory implements RoutePolicyFactory {
+
+    private String fromRouteId;
+    private int maxMessages;
+    private int maxSeconds;
+    private DurationRoutePolicy.Action action = DurationRoutePolicy.Action.STOP_ROUTE;
+
+    @Override
+    public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, RouteDefinition route) {
+        DurationRoutePolicy policy = null;
+
+        if (fromRouteId == null || EndpointHelper.matchPattern(routeId, fromRouteId)) {
+            policy = new DurationRoutePolicy(camelContext, routeId);
+            policy.setMaxMessages(maxMessages);
+            policy.setMaxSeconds(maxSeconds);
+            policy.setAction(action);
+        }
+
+        return policy;
+    }
+
+    public String getFromRouteId() {
+        return fromRouteId;
+    }
+
+    /**
+     * Limit the route policy to the route which matches this pattern
+     *
+     * @see EndpointHelper#matchPattern(String, String)
+     */
+    public void setFromRouteId(String fromRouteId) {
+        this.fromRouteId = fromRouteId;
+    }
+
+    /**
+     * Maximum number of messages to process before the action is triggered
+     */
+    public void setMaxMessages(int maxMessages) {
+        this.maxMessages = maxMessages;
+    }
+
+    public int getMaxSeconds() {
+        return maxSeconds;
+    }
+
+    /**
+     * Maximum seconds Camel is running before the action is triggered
+     */
+    public void setMaxSeconds(int maxSeconds) {
+        this.maxSeconds = maxSeconds;
+    }
+
+    public DurationRoutePolicy.Action getAction() {
+        return action;
+    }
+
+    /**
+     * What action to perform when maximum is triggered.
+     */
+    public void setAction(DurationRoutePolicy.Action action) {
+        this.action = action;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java
new file mode 100644
index 0000000..3c15f9e
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyFactoryTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DurationRoutePolicyFactoryTest extends ContextTestSupport {
+
+    public void testDurationRoutePolicyFactory() throws Exception {
+        assertTrue(context.getRouteStatus("foo").isStarted());
+        assertFalse(context.getRouteStatus("foo").isStopped());
+
+        // the policy should stop the route after 2 seconds which is approx 20-30 messages
+        getMockEndpoint("mock:foo").expectedMinimumMessageCount(10);
+        assertMockEndpointsSatisfied();
+
+        Exception cause = null;
+
+        // need a little time to stop async
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(100);
+            try {
+                assertFalse(context.getRouteStatus("foo").isStarted());
+                assertTrue(context.getRouteStatus("foo").isStopped());
+            } catch (Exception e) {
+                cause = e;
+            }
+        }
+
+        if (cause != null) {
+            throw cause;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                DurationRoutePolicyFactory factory = new DurationRoutePolicyFactory();
+                factory.setMaxSeconds(2);
+                factory.setMaxMessages(25);
+
+                getContext().addRoutePolicyFactory(factory);
+
+                from("timer:foo?period=100").routeId("foo")
+                    .to("mock:foo");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java
new file mode 100644
index 0000000..c7c873f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxMessagesTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DurationRoutePolicyMaxMessagesTest extends ContextTestSupport {
+
+    public void testDurationRoutePolicy() throws Exception {
+        assertTrue(context.getRouteStatus("foo").isStarted());
+        assertFalse(context.getRouteStatus("foo").isStopped());
+
+        // the policy should stop the route after 5 messages
+        getMockEndpoint("mock:foo").expectedMinimumMessageCount(5);
+        assertMockEndpointsSatisfied();
+
+        Exception cause = null;
+
+        // need a little time to stop async
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(100);
+            try {
+                assertFalse(context.getRouteStatus("foo").isStarted());
+                assertTrue(context.getRouteStatus("foo").isStopped());
+            } catch (Exception e) {
+                cause = e;
+            }
+        }
+
+        if (cause != null) {
+            throw cause;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                DurationRoutePolicy policy = new DurationRoutePolicy();
+                policy.setMaxMessages(5);
+
+                from("timer:foo?period=100").routeId("foo").routePolicy(policy)
+                    .to("mock:foo");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/ce1f04e2/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java
new file mode 100644
index 0000000..e0b0163
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/impl/DurationRoutePolicyMaxSecondsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class DurationRoutePolicyMaxSecondsTest extends ContextTestSupport {
+
+    public void testDurationRoutePolicy() throws Exception {
+        assertTrue(context.getRouteStatus("foo").isStarted());
+        assertFalse(context.getRouteStatus("foo").isStopped());
+
+        // the policy should stop the route after 2 seconds which is approx 20-30 messages
+        getMockEndpoint("mock:foo").expectedMinimumMessageCount(10);
+        assertMockEndpointsSatisfied();
+
+        Exception cause = null;
+
+        // need a little time to stop async
+        for (int i = 0; i < 10; i++) {
+            Thread.sleep(100);
+            try {
+                assertFalse(context.getRouteStatus("foo").isStarted());
+                assertTrue(context.getRouteStatus("foo").isStopped());
+            } catch (Exception e) {
+                cause = e;
+            }
+        }
+
+        if (cause != null) {
+            throw cause;
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                DurationRoutePolicy policy = new DurationRoutePolicy();
+                policy.setMaxSeconds(2);
+
+                from("timer:foo?period=100").routeId("foo").routePolicy(policy)
+                    .to("mock:foo");
+            }
+        };
+    }
+}