You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by lb...@apache.org on 2017/07/20 14:20:59 UTC
[1/2] camel git commit: CAMEL-11443: Add a RouteController SPI to
allow to customize routes life-cycle
Repository: camel
Updated Branches:
refs/heads/master 45616e17a -> e68111ec1
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTest.java b/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
new file mode 100644
index 0000000..aa18956
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.backoff;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BackOffTest {
+
+ @Test
+ public void testSimpleBackOff() {
+ final BackOff backOff = BackOff.builder().build();
+ final BackOffContext context = new BackOffContext(backOff);
+
+ long delay;
+
+ for (int i = 1; i <= 5; i++) {
+ delay = context.next();
+ Assert.assertEquals(i, context.getCurrentAttempts());
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis(), delay);
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis(), context.getCurrentDelay());
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis() * i, context.getCurrentElapsedTime());
+ }
+ }
+
+ @Test
+ public void testBackOffWithMultiplier() {
+ final BackOff backOff = BackOff.builder().multiplier(1.5).build();
+ final BackOffContext context = new BackOffContext(backOff);
+
+ long delay = BackOff.DEFAULT_DELAY.toMillis();
+ long oldDelay;
+ long elapsed = 0;
+
+ for (int i = 1; i <= 5; i++) {
+ oldDelay = delay;
+ delay = context.next();
+ elapsed += delay;
+
+ Assert.assertEquals(i, context.getCurrentAttempts());
+ Assert.assertEquals((long)(oldDelay * 1.5), delay);
+ Assert.assertEquals((long)(oldDelay * 1.5), context.getCurrentDelay());
+ Assert.assertEquals(elapsed, context.getCurrentElapsedTime(), 0);
+ }
+ }
+
+ @Test
+ public void testBackOffWithMaxAttempts() {
+ final BackOff backOff = BackOff.builder().maxAttempts(5L).build();
+ final BackOffContext context = new BackOffContext(backOff);
+
+ long delay;
+
+ for (int i = 1; i <= 5; i++) {
+ delay = context.next();
+ Assert.assertEquals(i, context.getCurrentAttempts());
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis(), delay);
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis(), context.getCurrentDelay());
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis() * i, context.getCurrentElapsedTime());
+ }
+
+ delay = context.next();
+ Assert.assertEquals(6, context.getCurrentAttempts());
+ Assert.assertEquals(BackOff.NEVER, delay);
+ }
+
+ @Test
+ public void testBackOffWithMaxTime() {
+ final BackOff backOff = BackOff.builder().maxElapsedTime(9, TimeUnit.SECONDS).build();
+ final BackOffContext context = new BackOffContext(backOff);
+
+ long delay;
+
+ for (int i = 1; i <= 5; i++) {
+ delay = context.next();
+ Assert.assertEquals(i, context.getCurrentAttempts());
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis(), delay);
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis(), context.getCurrentDelay());
+ Assert.assertEquals(BackOff.DEFAULT_DELAY.toMillis() * i, context.getCurrentElapsedTime());
+ }
+
+ delay = context.next();
+ Assert.assertEquals(6, context.getCurrentAttempts());
+ Assert.assertEquals(BackOff.NEVER, delay);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java b/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java
new file mode 100644
index 0000000..5c2cc9f
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/util/backoff/BackOffTimerTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.backoff;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BackOffTimerTest {
+ @Test
+ public void testBackOffTimer() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
+ final BackOff backOff = BackOff.builder().delay(100).build();
+ final BackOffTimer timer = new BackOffTimer(executor);
+
+ timer.schedule(
+ backOff,
+ context -> {
+ Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts());
+ Assert.assertEquals(100, context.getCurrentDelay());
+ Assert.assertEquals(100, context.getCurrentDelay());
+ Assert.assertEquals(100 * counter.get(), context.getCurrentElapsedTime());
+
+ return counter.get() < 5;
+ }
+ ).thenAccept(
+ context -> {
+ Assert.assertEquals(5, counter.get());
+ }
+ ).get(5, TimeUnit.SECONDS);
+
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testBackOffTimerWithMaxAttempts() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
+ final BackOff backOff = BackOff.builder().delay(100).maxAttempts(5L).build();
+ final BackOffTimer timer = new BackOffTimer(executor);
+
+ timer.schedule(
+ backOff,
+ context -> {
+ Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts());
+ Assert.assertEquals(100, context.getCurrentDelay());
+ Assert.assertEquals(100, context.getCurrentDelay());
+ Assert.assertEquals(100 * counter.get(), context.getCurrentElapsedTime());
+
+ return true;
+ }
+ ).thenAccept(
+ context -> {
+ Assert.assertEquals(5, counter.get());
+ }
+ ).get(5, TimeUnit.SECONDS);
+
+ executor.shutdownNow();
+ }
+
+ @Test
+ public void testBackOffTimerWithMaxElapsedTime() throws Exception {
+ final AtomicInteger counter = new AtomicInteger(0);
+ final ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
+ final BackOff backOff = BackOff.builder().delay(100).maxElapsedTime(400).build();
+ final BackOffTimer timer = new BackOffTimer(executor);
+
+ timer.schedule(
+ backOff,
+ context -> {
+ Assert.assertEquals(counter.incrementAndGet(), context.getCurrentAttempts());
+ Assert.assertEquals(100, context.getCurrentDelay());
+ Assert.assertEquals(100, context.getCurrentDelay());
+ Assert.assertEquals(100 * counter.get(), context.getCurrentElapsedTime());
+
+ return true;
+ }
+ ).thenAccept(
+ context -> {
+ Assert.assertTrue(counter.get() <= 5);
+ }
+ ).get(5, TimeUnit.SECONDS);
+
+ executor.shutdownNow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 64e9937..7f134f3 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -102,6 +102,7 @@ import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanFilter;
import org.apache.camel.spi.ProcessorFactory;
import org.apache.camel.spi.RestConfiguration;
+import org.apache.camel.spi.RouteController;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.RuntimeEndpointRegistry;
import org.apache.camel.spi.ShutdownStrategy;
@@ -339,6 +340,13 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
}
}
+ // Route controller
+ RouteController routeController = getBeanForType(RouteController.class);
+ if (clusterService != null) {
+ LOG.info("Using RouteController: " + routeController);
+ getContext().setRouteController(routeController);
+ }
+
// set the default thread pool profile if defined
initThreadPoolProfiles(getContext());
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java
index 690c632..6605029 100644
--- a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java
+++ b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/CamelAutoConfiguration.java
@@ -46,6 +46,7 @@ import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ManagementNamingStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.ReloadStrategy;
+import org.apache.camel.spi.RouteController;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.RuntimeEndpointRegistry;
import org.apache.camel.spi.ShutdownStrategy;
@@ -439,6 +440,13 @@ public class CamelAutoConfiguration {
camelContext.setSSLContextParameters(sslContextParametersSupplier.get());
}
+ // Route controller
+ RouteController routeController = getSingleBeanOfType(applicationContext, RouteController.class);
+ if (routeController != null) {
+ LOG.info("Using RouteController: " + routeController);
+ camelContext.setRouteController(routeController);
+ }
+
// set the default thread pool profile if defined
initThreadPoolProfiles(applicationContext, camelContext);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesEndpoint.java b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesEndpoint.java
index 9e86ed4..208f2df 100644
--- a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesEndpoint.java
+++ b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesEndpoint.java
@@ -18,6 +18,8 @@ package org.apache.camel.spring.boot.actuate.endpoint;
import java.util.Date;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.fasterxml.jackson.annotation.JsonInclude;
@@ -27,6 +29,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.Route;
import org.apache.camel.StatefulService;
import org.apache.camel.api.management.mbean.ManagedRouteMBean;
+import org.apache.camel.spi.RouteError;
import org.apache.camel.spring.boot.actuate.endpoint.CamelRoutesEndpoint.RouteEndpointInfo;
import org.springframework.boot.actuate.endpoint.AbstractEndpoint;
import org.springframework.boot.actuate.endpoint.Endpoint;
@@ -67,6 +70,30 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
return null;
}
+ public void startRoute(String id) throws Exception {
+ camelContext.getRouteController().startRoute(id);
+ }
+
+ public void stopRoute(String id, Optional<Long> timeout, Optional<TimeUnit> timeUnit, Optional<Boolean> abortAfterTimeout) throws Exception {
+ if (timeout.isPresent()) {
+ camelContext.getRouteController().stopRoute(id, timeout.get(), timeUnit.orElse(TimeUnit.SECONDS), abortAfterTimeout.orElse(Boolean.TRUE));
+ } else {
+ camelContext.getRouteController().stopRoute(id);
+ }
+ }
+
+ public void suspendRoute(String id, Optional<Long> timeout, Optional<TimeUnit> timeUnit) throws Exception {
+ if (timeout.isPresent()) {
+ camelContext.getRouteController().suspendRoute(id, timeout.get(), timeUnit.orElse(TimeUnit.SECONDS));
+ } else {
+ camelContext.getRouteController().suspendRoute(id);
+ }
+ }
+
+ public void resumeRoute(String id) throws Exception {
+ camelContext.getRouteController().resumeRoute(id);
+ }
+
/**
* Container for exposing {@link org.apache.camel.Route} information as JSON.
*/
@@ -82,7 +109,7 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
private final long uptimeMillis;
- private String status;
+ private final String status;
public RouteEndpointInfo(Route route) {
this.id = route.getId();
@@ -92,6 +119,8 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
if (route instanceof StatefulService) {
this.status = ((StatefulService) route).getStatus().name();
+ } else {
+ this.status = null;
}
}
@@ -127,16 +156,12 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
public RouteDetailsEndpointInfo(final CamelContext camelContext, final Route route) {
super(route);
+
if (camelContext.getManagementStrategy().getManagementAgent() != null) {
- this.routeDetails = new RouteDetails(camelContext.getManagedRoute(route.getId(),
- ManagedRouteMBean.class));
+ this.routeDetails = new RouteDetails(camelContext.getManagedRoute(route.getId(), ManagedRouteMBean.class));
}
}
- public RouteDetails getRouteDetails() {
- return routeDetails;
- }
-
@JsonInclude(JsonInclude.Include.NON_EMPTY)
static class RouteDetails {
@@ -188,6 +213,10 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
private long totalProcessingTime;
+ private RouteError lastError;
+
+ private boolean hasRouteController;
+
RouteDetails(ManagedRouteMBean managedRoute) {
try {
this.deltaProcessingTime = managedRoute.getDeltaProcessingTime();
@@ -214,6 +243,8 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
this.oldestInflightExchangeId = managedRoute.getOldestInflightExchangeId();
this.redeliveries = managedRoute.getRedeliveries();
this.totalProcessingTime = managedRoute.getTotalProcessingTime();
+ this.lastError = managedRoute.getLastError();
+ this.hasRouteController = managedRoute.getHasRouteController();
} catch (Exception e) {
// Ignore
}
@@ -314,6 +345,14 @@ public class CamelRoutesEndpoint extends AbstractEndpoint<List<RouteEndpointInfo
public long getTotalProcessingTime() {
return totalProcessingTime;
}
+
+ public RouteError getLastError() {
+ return lastError;
+ }
+
+ public boolean getHasRouteController() {
+ return hasRouteController;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpoint.java b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpoint.java
index c90e187..9933c78 100644
--- a/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpoint.java
+++ b/components/camel-spring-boot/src/main/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpoint.java
@@ -16,15 +16,23 @@
*/
package org.apache.camel.spring.boot.actuate.endpoint;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
import org.springframework.boot.actuate.endpoint.mvc.ActuatorMediaTypes;
import org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter;
import org.springframework.boot.actuate.endpoint.mvc.MvcEndpoint;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.ResponseStatus;
/**
* Adapter to expose {@link CamelRoutesEndpoint} as an {@link MvcEndpoint}.
@@ -42,20 +50,157 @@ public class CamelRoutesMvcEndpoint extends EndpointMvcAdapter {
this.delegate = delegate;
}
- @GetMapping(value = "/{id}", produces = {
+ // ********************************************
+ // Endpoints
+ // ********************************************
+
+ @ResponseBody
+ @GetMapping(
+ value = "/{id}/info",
+ produces = {
ActuatorMediaTypes.APPLICATION_ACTUATOR_V1_JSON_VALUE,
- MediaType.APPLICATION_JSON_VALUE })
+ MediaType.APPLICATION_JSON_VALUE
+ }
+ )
+ public Object info(
+ @PathVariable String id) {
+
+ return doIfEnabled(() -> {
+ Object result = delegate.getRouteDetailsInfo(id);
+ if (result == null) {
+ throw new NoSuchRouteException("No such route " + id);
+ }
+
+ return result;
+ });
+ }
+
@ResponseBody
- public Object get(@PathVariable String id) {
+ @PostMapping(
+ value = "/{id}/stop",
+ produces = {
+ ActuatorMediaTypes.APPLICATION_ACTUATOR_V1_JSON_VALUE,
+ MediaType.APPLICATION_JSON_VALUE
+ }
+ )
+ public Object stop(
+ @PathVariable String id,
+ @RequestAttribute(required = false) Long timeout,
+ @RequestAttribute(required = false) Boolean abortAfterTimeout) {
+
+ return doIfEnabled(() -> {
+ try {
+ delegate.stopRoute(
+ id,
+ Optional.ofNullable(timeout),
+ Optional.of(TimeUnit.SECONDS),
+ Optional.ofNullable(abortAfterTimeout)
+ );
+ } catch (Exception e) {
+ throw new GenericException("Error stopping route " + id, e);
+ }
+
+ return ResponseEntity.ok().build();
+ });
+ }
+
+ @ResponseBody
+ @PostMapping(
+ value = "/{id}/start",
+ produces = {
+ ActuatorMediaTypes.APPLICATION_ACTUATOR_V1_JSON_VALUE,
+ MediaType.APPLICATION_JSON_VALUE
+ }
+ )
+ public Object start(
+ @PathVariable String id) {
+
+ return doIfEnabled(() -> {
+ try {
+ delegate.startRoute(id);
+ } catch (Exception e) {
+ throw new GenericException("Error starting route " + id, e);
+ }
+
+ return ResponseEntity.ok().build();
+ });
+ }
+
+ @ResponseBody
+ @PostMapping(
+ value = "/{id}/suspend",
+ produces = {
+ ActuatorMediaTypes.APPLICATION_ACTUATOR_V1_JSON_VALUE,
+ MediaType.APPLICATION_JSON_VALUE
+ }
+ )
+ public Object suspend(
+ @PathVariable String id,
+ @RequestAttribute(required = false) Long timeout) {
+
+ return doIfEnabled(() -> {
+ try {
+ delegate.suspendRoute(
+ id,
+ Optional.ofNullable(timeout),
+ Optional.of(TimeUnit.SECONDS)
+ );
+ } catch (Exception e) {
+ throw new GenericException("Error suspending route " + id, e);
+ }
+
+ return ResponseEntity.ok().build();
+ });
+ }
+
+ @ResponseBody
+ @PostMapping(
+ value = "/{id}/resume",
+ produces = {
+ ActuatorMediaTypes.APPLICATION_ACTUATOR_V1_JSON_VALUE,
+ MediaType.APPLICATION_JSON_VALUE
+ }
+ )
+ public Object resume(
+ @PathVariable String id) {
+
+ return doIfEnabled(() -> {
+ try {
+ delegate.resumeRoute(id);
+ } catch (Exception e) {
+ throw new GenericException("Error resuming route " + id, e);
+ }
+
+ return ResponseEntity.ok().build();
+ });
+ }
+
+ // ********************************************
+ // Helpers
+ // ********************************************
+
+ private Object doIfEnabled(Supplier<Object> supplier) {
if (!delegate.isEnabled()) {
return getDisabledResponse();
}
- Object result = delegate.getRouteDetailsInfo(id);
- if (result == null) {
- result = NOT_FOUND;
+ return supplier.get();
+ }
+
+ @SuppressWarnings("serial")
+ @ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
+ public static class GenericException extends RuntimeException {
+ public GenericException(String message, Throwable cause) {
+ super(message, cause);
+
}
+ }
- return result;
+ @SuppressWarnings("serial")
+ @ResponseStatus(value = HttpStatus.NOT_FOUND, reason = "No such route")
+ public static class NoSuchRouteException extends RuntimeException {
+ public NoSuchRouteException(String message) {
+ super(message);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpointTest.java b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpointTest.java
index 63e47cf..9d752f2 100644
--- a/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpointTest.java
+++ b/components/camel-spring-boot/src/test/java/org/apache/camel/spring/boot/actuate/endpoint/CamelRoutesMvcEndpointTest.java
@@ -59,7 +59,7 @@ public class CamelRoutesMvcEndpointTest extends Assert {
@Test
public void testMvcRoutesEndpoint() throws Exception {
- Object result = endpoint.get("foo-route");
+ Object result = endpoint.info("foo-route");
assertTrue(result instanceof RouteDetailsEndpointInfo);
assertEquals("foo-route", ((RouteDetailsEndpointInfo)result).getId());
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/examples/camel-example-spring-boot-routecontroller/pom.xml
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-routecontroller/pom.xml b/examples/camel-example-spring-boot-routecontroller/pom.xml
new file mode 100644
index 0000000..b30b67e
--- /dev/null
+++ b/examples/camel-example-spring-boot-routecontroller/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel.example</groupId>
+ <artifactId>examples</artifactId>
+ <version>2.20.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-example-spring-boot-routecontroller</artifactId>
+ <name>Camel :: Example :: Spring Boot :: Route Controller</name>
+ <description>An example showing how to work with Camel Route Controller and Spring Boot</description>
+
+ <properties>
+ <category>Beginner</category>
+
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ <spring.boot-version>${spring-boot-version}</spring.boot-version>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <!-- Spring Boot BOM -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-dependencies</artifactId>
+ <version>${spring.boot-version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <!-- Camel BOM -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring-boot-dependencies</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.jolokia</groupId>
+ <artifactId>jolokia-core</artifactId>
+ </dependency>
+
+ <!-- Spring Boot -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-undertow</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-actuator</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-configuration-processor</artifactId>
+ <optional>true</optional>
+ <version>${spring-boot-version}</version>
+ </dependency>
+
+ <!-- Camel -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-undertow-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-stream-starter</artifactId>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-test-spring</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ <version>${spring-boot-version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>repackage</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>jdk9-build</id>
+ <activation>
+ <jdk>9</jdk>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <argLine>--add-modules java.xml.bind --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/examples/camel-example-spring-boot-routecontroller/readme.adoc
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-routecontroller/readme.adoc b/examples/camel-example-spring-boot-routecontroller/readme.adoc
new file mode 100644
index 0000000..5b54d9c
--- /dev/null
+++ b/examples/camel-example-spring-boot-routecontroller/readme.adoc
@@ -0,0 +1,44 @@
+# Camel Route Controller Example Spring Boot
+
+This example shows how to work with a simple Apache Camel application using Spring Boot and a Route Controller.
+
+## How to run
+
+You can run this example using
+
+ mvn spring-boot:run
+
+Beside JMX you can use Spring Boot Endpoints to interact with the routes:
+
+* To get info about the routes
++
+[source]
+----
+curl -XGET -s http://localhost:8080/camel/routes
+----
+
+* To get info about a route
++
+[source]
+----
+curl -XGET -s http://localhost:8080/camel/routes/${id}/info
+----
+
+* To stop a route
++
+[source]
+----
+curl -XPOST -s http://localhost:8080/camel/routes/{id}/stop
+----
+
+* To start a route
++
+[source]
+----
+curl -XPOST -s http://localhost:8080/camel/routes/{id}/stop
+----
+
+
+## More information
+
+You can find more information about Apache Camel at the website: http://camel.apache.org/
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/Application.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/Application.java b/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/Application.java
new file mode 100644
index 0000000..2a97fed
--- /dev/null
+++ b/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/Application.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sample.camel;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+//CHECKSTYLE:OFF
+/**
+ * A sample Spring Boot application that starts the Camel routes.
+ */
+@SpringBootApplication
+public class Application {
+
+ /**
+ * A main method to start this application.
+ */
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+}
+//CHECKSTYLE:ON
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/ApplicationRoutes.java
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/ApplicationRoutes.java b/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/ApplicationRoutes.java
new file mode 100644
index 0000000..bd1473a
--- /dev/null
+++ b/examples/camel-example-spring-boot-routecontroller/src/main/java/sample/camel/ApplicationRoutes.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sample.camel;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.springframework.stereotype.Component;
+
+/**
+ *
+ */
+@Component
+public class ApplicationRoutes extends RouteBuilder {
+ @Override
+ public void configure() throws Exception {
+ from("timer:foo?period=5s")
+ .id("foo")
+ .startupOrder(2)
+ .log("From timer (foo) ...");
+
+ from("timer:bar?period=5s")
+ .id("bar")
+ .startupOrder(1)
+ .log("From timer (bar) ...");
+
+ from("undertow:http://localhost:9011")
+ .id("undertow")
+ .log("From undertow ...");
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/examples/camel-example-spring-boot-routecontroller/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/examples/camel-example-spring-boot-routecontroller/src/main/resources/application.properties b/examples/camel-example-spring-boot-routecontroller/src/main/resources/application.properties
new file mode 100644
index 0000000..89ea7d1
--- /dev/null
+++ b/examples/camel-example-spring-boot-routecontroller/src/main/resources/application.properties
@@ -0,0 +1,40 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+debug = false
+
+logging.level.org.springframework = INFO
+logging.level.org.apache.camel.spring.boot = INFO
+logging.level.org.apache.camel.impl = DEBUG
+logging.level.org.apache.camel.util.backoff = DEBUG
+logging.level.sample.camel = DEBUG
+
+endpoints.enabled = false
+endpoints.jmx.enabled = false
+endpoints.health.enabled = true
+endpoints.camelroutes.path = /camel/routes
+endpoints.camelroutes.enabled = true
+
+management.security.enabled = false
+
+camel.springboot.name = SampleSupervisingRouteController
+
+camel.supervising.controller.enabled = true
+camel.supervising.controller.back-off.delay = 5s
+camel.supervising.controller.back-off.max-attempts = 10
+camel.supervising.controller.routes.undertow.back-off.delay = 10s
+camel.supervising.controller.routes.undertow.back-off.max-attempts = 3
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index ecbae00..d835d62 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -100,6 +100,7 @@
<module>camel-example-spring-boot-metrics</module>
<module>camel-example-spring-boot-rest-jpa</module>
<module>camel-example-spring-boot-rest-swagger</module>
+ <module>camel-example-spring-boot-routecontroller</module>
<module>camel-example-spring-boot-servicecall</module>
<module>camel-example-spring-cloud-servicecall</module>
<module>camel-example-spring-javaconfig</module>
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerAutoConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerAutoConfiguration.java
new file mode 100644
index 0000000..d872ec3
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerAutoConfiguration.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.springboot;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.camel.converter.TimePatternConverter;
+import org.apache.camel.impl.SupervisingRouteController;
+import org.apache.camel.impl.springboot.SupervisingRouteControllerConfiguration.BackOffConfiguration;
+import org.apache.camel.impl.springboot.SupervisingRouteControllerConfiguration.RouteConfiguration;
+import org.apache.camel.spi.RouteController;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.util.backoff.BackOff;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.boot.autoconfigure.AutoConfigureBefore;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Scope;
+
+@Configuration
+@AutoConfigureBefore(CamelAutoConfiguration.class)
+@ConditionalOnProperty(prefix = "camel.supervising.controller", name = "enabled")
+@EnableConfigurationProperties(SupervisingRouteControllerConfiguration.class)
+public class SupervisingRouteControllerAutoConfiguration {
+ @Autowired
+ private SupervisingRouteControllerConfiguration configuration;
+
+ @Bean
+ @Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
+ @ConditionalOnMissingBean
+ public RouteController routeController() {
+ SupervisingRouteController controller = new SupervisingRouteController();
+
+ controller.setDefaultBackOff(configureBackOff(Optional.empty(), configuration.getBackOff()));
+
+ for (Map.Entry<String, RouteConfiguration> entry: configuration.getRoutes().entrySet()) {
+ controller.setBackOff(
+ entry.getKey(),
+ configureBackOff(
+ Optional.ofNullable(controller.getDefaultBackOff()),
+ entry.getValue().getBackOff()
+ )
+ );
+ }
+
+ return controller;
+ }
+
+ private BackOff configureBackOff(Optional<BackOff> template, BackOffConfiguration conf) {
+ final BackOff.Builder builder = template.map(t -> BackOff.builder().read(t)).orElseGet(BackOff::builder);
+
+ Optional.ofNullable(conf.getDelay()).map(TimePatternConverter::toMilliSeconds).ifPresent(builder::delay);
+ Optional.ofNullable(conf.getMaxDelay()).map(TimePatternConverter::toMilliSeconds).ifPresent(builder::maxDelay);
+ Optional.ofNullable(conf.getMaxElapsedTime()).map(TimePatternConverter::toMilliSeconds).ifPresent(builder::maxElapsedTime);
+ Optional.ofNullable(conf.getMaxAttempts()).ifPresent(builder::maxAttempts);
+ Optional.ofNullable(conf.getMultiplier()).ifPresent(builder::multiplier);
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerConfiguration.java
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerConfiguration.java b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerConfiguration.java
new file mode 100644
index 0000000..4460671
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-core-starter/src/main/java/org/apache/camel/impl/springboot/SupervisingRouteControllerConfiguration.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.springboot;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties(prefix = "camel.supervising.controller")
+public class SupervisingRouteControllerConfiguration {
+ private boolean enabled;
+ private BackOffConfiguration backOff = new BackOffConfiguration();
+ private Map<String, RouteConfiguration> routes = new HashMap<>();
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public BackOffConfiguration getBackOff() {
+ return backOff;
+ }
+
+ public Map<String, RouteConfiguration> getRoutes() {
+ return routes;
+ }
+
+ // *****************************************
+ // Configuration Classes
+ // *****************************************
+
+ public static class RouteConfiguration {
+ private BackOffConfiguration backOff;
+
+ public BackOffConfiguration getBackOff() {
+ return backOff;
+ }
+
+ public void setBackOff(BackOffConfiguration backOff) {
+ this.backOff = backOff;
+ }
+ }
+
+ public static class BackOffConfiguration {
+ private String delay;
+ private String maxDelay;
+ private String maxElapsedTime;
+ private Long maxAttempts;
+ private Double multiplier;
+
+ public String getDelay() {
+ return delay;
+ }
+
+ public void setDelay(String delay) {
+ this.delay = delay;
+ }
+
+ public String getMaxDelay() {
+ return maxDelay;
+ }
+
+ public void setMaxDelay(String maxDelay) {
+ this.maxDelay = maxDelay;
+ }
+
+ public String getMaxElapsedTime() {
+ return maxElapsedTime;
+ }
+
+ public void setMaxElapsedTime(String maxElapsedTime) {
+ this.maxElapsedTime = maxElapsedTime;
+ }
+
+ public Long getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public void setMaxAttempts(Long maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ }
+
+ public Double getMultiplier() {
+ return multiplier;
+ }
+
+ public void setMultiplier(Double multiplier) {
+ this.multiplier = multiplier;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/platforms/spring-boot/components-starter/camel-core-starter/src/main/resources/META-INF/spring.factories
----------------------------------------------------------------------
diff --git a/platforms/spring-boot/components-starter/camel-core-starter/src/main/resources/META-INF/spring.factories b/platforms/spring-boot/components-starter/camel-core-starter/src/main/resources/META-INF/spring.factories
index 253b608..656661c 100644
--- a/platforms/spring-boot/components-starter/camel-core-starter/src/main/resources/META-INF/spring.factories
+++ b/platforms/spring-boot/components-starter/camel-core-starter/src/main/resources/META-INF/spring.factories
@@ -29,6 +29,7 @@ org.apache.camel.impl.springboot.StringDataFormatAutoConfiguration,\
org.apache.camel.impl.springboot.ZipDataFormatAutoConfiguration,\
org.apache.camel.impl.springboot.GzipDataFormatAutoConfiguration,\
org.apache.camel.impl.springboot.SerializationDataFormatAutoConfiguration,\
+org.apache.camel.impl.springboot.SupervisingRouteControllerAutoConfiguration,\
org.apache.camel.language.constant.springboot.ConstantLanguageAutoConfiguration,\
org.apache.camel.language.simple.springboot.SimpleLanguageAutoConfiguration,\
org.apache.camel.language.ref.springboot.RefLanguageAutoConfiguration,\
[2/2] camel git commit: CAMEL-11443: Add a RouteController SPI to
allow to customize routes life-cycle
Posted by lb...@apache.org.
CAMEL-11443: Add a RouteController SPI to allow to customize routes life-cycle
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e68111ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e68111ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e68111ec
Branch: refs/heads/master
Commit: e68111ec13c85b5a2596797f404c1f4e433631e9
Parents: 45616e1
Author: lburgazzoli <lb...@gmail.com>
Authored: Mon Jul 10 18:17:22 2017 +0200
Committer: lburgazzoli <lb...@gmail.com>
Committed: Thu Jul 20 16:20:11 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/CamelContext.java | 15 +
.../java/org/apache/camel/Experimental.java | 32 ++
.../api/management/mbean/ManagedRouteMBean.java | 8 +
.../apache/camel/impl/DefaultCamelContext.java | 331 +++++++----
.../apache/camel/impl/DefaultRouteContext.java | 23 +
.../camel/impl/DefaultRouteController.java | 107 ++++
.../apache/camel/impl/DefaultRouteError.java | 67 +++
.../org/apache/camel/impl/RouteService.java | 1 +
.../camel/impl/SupervisingRouteController.java | 556 +++++++++++++++++++
.../camel/management/mbean/ManagedRoute.java | 19 +-
.../java/org/apache/camel/spi/RouteContext.java | 36 ++
.../org/apache/camel/spi/RouteController.java | 41 ++
.../java/org/apache/camel/spi/RouteError.java | 42 ++
.../org/apache/camel/util/ObjectHelper.java | 15 +
.../org/apache/camel/util/backoff/BackOff.java | 181 ++++++
.../camel/util/backoff/BackOffContext.java | 101 ++++
.../apache/camel/util/backoff/BackOffTimer.java | 97 ++++
.../org/apache/camel/util/backoff/package.html | 25 +
.../apache/camel/util/backoff/BackOffTest.java | 102 ++++
.../camel/util/backoff/BackOffTimerTest.java | 105 ++++
.../xml/AbstractCamelContextFactoryBean.java | 8 +
.../spring/boot/CamelAutoConfiguration.java | 8 +
.../actuate/endpoint/CamelRoutesEndpoint.java | 53 +-
.../endpoint/CamelRoutesMvcEndpoint.java | 159 +++++-
.../endpoint/CamelRoutesMvcEndpointTest.java | 2 +-
.../pom.xml | 154 +++++
.../readme.adoc | 44 ++
.../src/main/java/sample/camel/Application.java | 37 ++
.../java/sample/camel/ApplicationRoutes.java | 43 ++
.../src/main/resources/application.properties | 40 ++
examples/pom.xml | 1 +
...rvisingRouteControllerAutoConfiguration.java | 79 +++
...SupervisingRouteControllerConfiguration.java | 109 ++++
.../main/resources/META-INF/spring.factories | 1 +
34 files changed, 2512 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 6568121..98d0d6c 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -72,6 +72,7 @@ import org.apache.camel.spi.Registry;
import org.apache.camel.spi.ReloadStrategy;
import org.apache.camel.spi.RestConfiguration;
import org.apache.camel.spi.RestRegistry;
+import org.apache.camel.spi.RouteController;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.RuntimeEndpointRegistry;
@@ -506,6 +507,20 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
//-----------------------------------------------------------------------
/**
+ * NOTE: experimental api
+ *
+ * @param routeController the route controller
+ */
+ void setRouteController(RouteController routeController);
+
+ /**
+ * NOTE: experimental api
+ *
+ * @return the route controller or null if not set.
+ */
+ RouteController getRouteController();
+
+ /**
* Method to signal to {@link CamelContext} that the process to initialize setup routes is in progress.
*
* @param done <tt>false</tt> to start the process, call again with <tt>true</tt> to signal its done.
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/Experimental.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Experimental.java b/camel-core/src/main/java/org/apache/camel/Experimental.java
new file mode 100644
index 0000000..6ea6c1f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/Experimental.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * An experimental user-facing API. Experimental API's might change or be removed
+ * in minor versions.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.TYPE, ElementType.METHOD})
+public @interface Experimental {
+ int revision() default 1;
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index 010a6dd..409a654 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -16,8 +16,10 @@
*/
package org.apache.camel.api.management.mbean;
+import org.apache.camel.Experimental;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.spi.RouteError;
public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean {
@@ -129,5 +131,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean {
@ManagedAttribute(description = "Oldest inflight exchange id")
String getOldestInflightExchangeId();
+ @Experimental
+ @ManagedAttribute(description = "Route controller")
+ Boolean getHasRouteController();
+ @Experimental
+ @ManagedAttribute(description = "Last error")
+ RouteError getLastError();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 3ebad6d..bf7cb59 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -32,6 +32,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
@@ -157,6 +158,8 @@ import org.apache.camel.spi.ReloadStrategy;
import org.apache.camel.spi.RestConfiguration;
import org.apache.camel.spi.RestRegistry;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteController;
+import org.apache.camel.spi.RouteError;
import org.apache.camel.spi.RoutePolicyFactory;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.RuntimeEndpointRegistry;
@@ -309,6 +312,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
private final RuntimeCamelCatalog runtimeCamelCatalog = new DefaultRuntimeCamelCatalog(this, true);
private SSLContextParameters sslContextParameters;
private final ThreadLocal<Set<String>> componentsInCreation = ThreadLocal.withInitial(HashSet::new);
+ private RouteController routeController;
/**
* Creates the {@link CamelContext} using {@link JndiRegistry} as registry,
@@ -346,6 +350,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
this.managementStrategy = createManagementStrategy();
this.managementMBeanAssembler = createManagementMBeanAssembler();
+ // Route controller
+ this.routeController = new DefaultRouteController(this);
+
// Call all registered trackers with this context
// Note, this may use a partially constructed object
CamelContextTrackerRegistry.INSTANCE.contextCreated(this);
@@ -852,6 +859,17 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
// Route Management Methods
// -----------------------------------------------------------------------
+ @Override
+ public void setRouteController(RouteController routeController) {
+ this.routeController = routeController;
+ this.routeController.setCamelContext(this);
+ }
+
+ @Override
+ public RouteController getRouteController() {
+ return routeController;
+ }
+
public List<RouteStartupOrder> getRouteStartupOrder() {
return routeStartupOrder;
}
@@ -1137,99 +1155,151 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
public synchronized void startRoute(String routeId) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- startRouteService(routeService, false);
+ try {
+ startRouteService(routeService, false);
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.START, e);
+ throw e;
+ }
}
}
public synchronized void resumeRoute(String routeId) throws Exception {
- if (!routeSupportsSuspension(routeId)) {
- // start route if suspension is not supported
- startRoute(routeId);
- return;
- }
+ DefaultRouteError.reset(this, routeId);
- RouteService routeService = routeServices.get(routeId);
- if (routeService != null) {
- resumeRouteService(routeService);
- // must resume the route as well
- Route route = getRoute(routeId);
- ServiceHelper.resumeService(route);
+ try {
+ if (!routeSupportsSuspension(routeId)) {
+ // start route if suspension is not supported
+ startRoute(routeId);
+ return;
+ }
+
+ RouteService routeService = routeServices.get(routeId);
+ if (routeService != null) {
+ resumeRouteService(routeService);
+ // must resume the route as well
+ Route route = getRoute(routeId);
+ ServiceHelper.resumeService(route);
+ }
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.RESUME, e);
+ throw e;
}
}
public synchronized boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ try {
+ RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
- boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout);
- if (completed) {
- // must stop route service as well
- stopRouteService(routeService, false);
- } else {
- // shutdown was aborted, make sure route is re-started properly
- startRouteService(routeService, false);
+ boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout);
+ if (completed) {
+ // must stop route service as well
+ stopRouteService(routeService, false);
+ } else {
+ // shutdown was aborted, make sure route is re-started properly
+ startRouteService(routeService, false);
+ }
+ return completed;
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.STOP, e);
+ throw e;
}
- return completed;
}
+
return false;
}
public synchronized void stopRoute(String routeId) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
- RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
- routes.add(order);
+ try {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ routes.add(order);
- getShutdownStrategy().shutdown(this, routes);
- // must stop route service as well
- stopRouteService(routeService, false);
+ getShutdownStrategy().shutdown(this, routes);
+ // must stop route service as well
+ stopRouteService(routeService, false);
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.STOP, e);
+ throw e;
+ }
}
}
public synchronized void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
- RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
- routes.add(order);
+ try {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ routes.add(order);
- getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
- // must stop route service as well
- stopRouteService(routeService, false);
+ getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
+ // must stop route service as well
+ stopRouteService(routeService, false);
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.STOP, e);
+ throw e;
+ }
}
}
public synchronized void shutdownRoute(String routeId) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
- RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
- routes.add(order);
+ try {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ routes.add(order);
- getShutdownStrategy().shutdown(this, routes);
- // must stop route service as well (and remove the routes from management)
- stopRouteService(routeService, true);
+ getShutdownStrategy().shutdown(this, routes);
+ // must stop route service as well (and remove the routes from management)
+ stopRouteService(routeService, true);
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.SHUTDOWN, e);
+ throw e;
+ }
}
}
public synchronized void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
- List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
- RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
- routes.add(order);
+ try {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService);
+ routes.add(order);
- getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
- // must stop route service as well (and remove the routes from management)
- stopRouteService(routeService, true);
+ getShutdownStrategy().shutdown(this, routes, timeout, timeUnit);
+ // must stop route service as well (and remove the routes from management)
+ stopRouteService(routeService, true);
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.SHUTDOWN, e);
+ throw e;
+ }
}
}
public synchronized boolean removeRoute(String routeId) throws Exception {
+ DefaultRouteError.reset(this, routeId);
+
// remove the route from ErrorHandlerBuilder if possible
if (getErrorHandlerBuilder() instanceof ErrorHandlerBuilderSupport) {
ErrorHandlerBuilderSupport builder = (ErrorHandlerBuilderSupport)getErrorHandlerBuilder();
@@ -1246,39 +1316,45 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
RouteService routeService = routeServices.get(routeId);
if (routeService != null) {
if (getRouteStatus(routeId).isStopped()) {
- routeService.setRemovingRoutes(true);
- shutdownRouteService(routeService);
- removeRouteDefinition(routeId);
- routeServices.remove(routeId);
- // remove route from startup order as well, as it was removed
- Iterator<RouteStartupOrder> it = routeStartupOrder.iterator();
- while (it.hasNext()) {
- RouteStartupOrder order = it.next();
- if (order.getRoute().getId().equals(routeId)) {
- it.remove();
+ try {
+ routeService.setRemovingRoutes(true);
+ shutdownRouteService(routeService);
+ removeRouteDefinition(routeId);
+ routeServices.remove(routeId);
+ // remove route from startup order as well, as it was removed
+ Iterator<RouteStartupOrder> it = routeStartupOrder.iterator();
+ while (it.hasNext()) {
+ RouteStartupOrder order = it.next();
+ if (order.getRoute().getId().equals(routeId)) {
+ it.remove();
+ }
}
- }
- // from the route which we have removed, then remove all its private endpoints
- // (eg the endpoints which are not in use by other routes)
- Set<Endpoint> toRemove = new LinkedHashSet<Endpoint>();
- for (Endpoint endpoint : endpointsInUse.get(routeId)) {
- // how many times is the endpoint in use
- int count = 0;
- for (Set<Endpoint> endpoints : endpointsInUse.values()) {
- if (endpoints.contains(endpoint)) {
- count++;
+ // from the route which we have removed, then remove all its private endpoints
+ // (eg the endpoints which are not in use by other routes)
+ Set<Endpoint> toRemove = new LinkedHashSet<Endpoint>();
+ for (Endpoint endpoint : endpointsInUse.get(routeId)) {
+ // how many times is the endpoint in use
+ int count = 0;
+ for (Set<Endpoint> endpoints : endpointsInUse.values()) {
+ if (endpoints.contains(endpoint)) {
+ count++;
+ }
+ }
+ // notice we will count ourselves so if there is only 1 then its safe to remove
+ if (count <= 1) {
+ toRemove.add(endpoint);
}
}
- // notice we will count ourselves so if there is only 1 then its safe to remove
- if (count <= 1) {
- toRemove.add(endpoint);
+ for (Endpoint endpoint : toRemove) {
+ log.debug("Removing: {} which was only in use by route: {}", endpoint, routeId);
+ removeEndpoint(endpoint);
}
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.REMOVE, e);
+ throw e;
}
- for (Endpoint endpoint : toRemove) {
- log.debug("Removing: {} which was only in use by route: {}", endpoint, routeId);
- removeEndpoint(endpoint);
- }
+
return true;
} else {
return false;
@@ -1288,49 +1364,63 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
public synchronized void suspendRoute(String routeId) throws Exception {
- if (!routeSupportsSuspension(routeId)) {
- // stop if we suspend is not supported
- stopRoute(routeId);
- return;
- }
+ try {
+ DefaultRouteError.reset(this, routeId);
- RouteService routeService = routeServices.get(routeId);
- if (routeService != null) {
- List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
- Route route = routeService.getRoutes().iterator().next();
- RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService);
- routes.add(order);
-
- getShutdownStrategy().suspend(this, routes);
- // must suspend route service as well
- suspendRouteService(routeService);
- // must suspend the route as well
- if (route instanceof SuspendableService) {
- ((SuspendableService) route).suspend();
+ if (!routeSupportsSuspension(routeId)) {
+ // stop if we suspend is not supported
+ stopRoute(routeId);
+ return;
}
+
+ RouteService routeService = routeServices.get(routeId);
+ if (routeService != null) {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ Route route = routeService.getRoutes().iterator().next();
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService);
+ routes.add(order);
+
+ getShutdownStrategy().suspend(this, routes);
+ // must suspend route service as well
+ suspendRouteService(routeService);
+ // must suspend the route as well
+ if (route instanceof SuspendableService) {
+ ((SuspendableService) route).suspend();
+ }
+ }
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.SUSPEND, e);
+ throw e;
}
}
public synchronized void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
- if (!routeSupportsSuspension(routeId)) {
- stopRoute(routeId, timeout, timeUnit);
- return;
- }
+ DefaultRouteError.reset(this, routeId);
- RouteService routeService = routeServices.get(routeId);
- if (routeService != null) {
- List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
- Route route = routeService.getRoutes().iterator().next();
- RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService);
- routes.add(order);
-
- getShutdownStrategy().suspend(this, routes, timeout, timeUnit);
- // must suspend route service as well
- suspendRouteService(routeService);
- // must suspend the route as well
- if (route instanceof SuspendableService) {
- ((SuspendableService) route).suspend();
+ try {
+ if (!routeSupportsSuspension(routeId)) {
+ stopRoute(routeId, timeout, timeUnit);
+ return;
+ }
+
+ RouteService routeService = routeServices.get(routeId);
+ if (routeService != null) {
+ List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
+ Route route = routeService.getRoutes().iterator().next();
+ RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService);
+ routes.add(order);
+
+ getShutdownStrategy().suspend(this, routes, timeout, timeUnit);
+ // must suspend route service as well
+ suspendRouteService(routeService);
+ // must suspend the route as well
+ if (route instanceof SuspendableService) {
+ ((SuspendableService) route).suspend();
+ }
}
+ } catch (Exception e) {
+ DefaultRouteError.set(this, routeId, RouteError.Phase.SUSPEND, e);
+ throw e;
}
}
@@ -3035,6 +3125,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
// [TODO] Remove in 3.0
Container.Instance.manage(this);
+ // Start the route controller
+ ServiceHelper.startServices(this.routeController);
+
doNotStartRoutesOnFirstStart = !firstStartDone && !isAutoStartup();
// if the context was configured with auto startup = false, and we are already started,
@@ -3162,8 +3255,8 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
if (log.isDebugEnabled()) {
- log.debug("Using ClassResolver={}, PackageScanClassResolver={}, ApplicationContextClassLoader={}",
- new Object[]{getClassResolver(), getPackageScanClassResolver(), getApplicationContextClassLoader()});
+ log.debug("Using ClassResolver={}, PackageScanClassResolver={}, ApplicationContextClassLoader={}, RouteController={}",
+ getClassResolver(), getPackageScanClassResolver(), getApplicationContextClassLoader(), getRouteController());
}
if (isStreamCaching()) {
@@ -3363,6 +3456,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
stopWatch.restart();
log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is shutting down");
EventHelper.notifyCamelContextStopping(this);
+
+ // Stop the route controller
+ ServiceHelper.stopAndShutdownService(this.routeController);
// stop route inputs in the same order as they was started so we stop the very first inputs first
try {
@@ -3879,7 +3975,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
for (LifecycleStrategy strategy : lifecycleStrategies) {
strategy.onServiceAdd(this, consumer, route);
}
- startService(consumer);
+ try {
+ startService(consumer);
+ route.getProperties().remove("route.start.exception");
+ } catch (Exception e) {
+ route.getProperties().put("route.start.exception", e);
+ throw e;
+ }
+
log.info("Route: " + route.getId() + " started and consuming from: " + endpoint);
}
@@ -3903,7 +4006,13 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
routeService.resume();
} else {
// and start the route service (no need to start children as they are already warmed up)
- routeService.start(false);
+ try {
+ routeService.start(false);
+ route.getProperties().remove("route.start.exception");
+ } catch (Exception e) {
+ route.getProperties().put("route.start.exception", e);
+ throw e;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
index 89342b8..869b660 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
@@ -40,6 +40,8 @@ import org.apache.camel.processor.Pipeline;
import org.apache.camel.spi.Contract;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteController;
+import org.apache.camel.spi.RouteError;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ObjectHelper;
@@ -71,6 +73,8 @@ public class DefaultRouteContext implements RouteContext {
private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>();
private ShutdownRoute shutdownRoute;
private ShutdownRunningTask shutdownRunningTask;
+ private RouteError routeError;
+ private RouteController routeController;
public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
this.camelContext = camelContext;
@@ -442,4 +446,23 @@ public class DefaultRouteContext implements RouteContext {
return routePolicyList;
}
+ @Override
+ public RouteError getLastError() {
+ return routeError;
+ }
+
+ @Override
+ public void setLastError(RouteError routeError) {
+ this.routeError = routeError;
+ }
+
+ @Override
+ public RouteController getRouteController() {
+ return routeController;
+ }
+
+ @Override
+ public void setRouteController(RouteController routeController) {
+ this.routeController = routeController;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java
new file mode 100644
index 0000000..ff51cf3
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteController.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Experimental;
+import org.apache.camel.Route;
+import org.apache.camel.spi.RouteController;
+
+@Experimental
+public class DefaultRouteController extends org.apache.camel.support.ServiceSupport implements RouteController {
+ private final List<Route> routes;
+ private CamelContext camelContext;
+
+ public DefaultRouteController() {
+ this(null);
+ }
+
+ public DefaultRouteController(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ this.routes = new ArrayList<>();
+ }
+
+ // ***************************************************
+ // Properties
+ // ***************************************************
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ // ***************************************************
+ // Life cycle
+ // ***************************************************
+
+ @Override
+ protected void doStart() throws Exception {
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ }
+
+ // ***************************************************
+ // Route management
+ // ***************************************************
+
+ @Override
+ public void startRoute(String routeId) throws Exception {
+ camelContext.startRoute(routeId);
+ }
+
+ @Override
+ public void stopRoute(String routeId) throws Exception {
+ camelContext.stopRoute(routeId);
+ }
+
+ @Override
+ public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ camelContext.stopRoute(routeId, timeout, timeUnit);
+ }
+
+ @Override
+ public boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
+ return camelContext.stopRoute(routeId, timeout, timeUnit, abortAfterTimeout);
+ }
+
+ @Override
+ public void suspendRoute(String routeId) throws Exception {
+ camelContext.suspendRoute(routeId);
+ }
+
+ @Override
+ public void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ camelContext.suspendRoute(routeId, timeout, timeUnit);
+ }
+
+ @Override
+ public void resumeRoute(String routeId) throws Exception {
+ camelContext.resumeRoute(routeId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java
new file mode 100644
index 0000000..0f80594
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteError.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Route;
+import org.apache.camel.spi.RouteError;
+
+public class DefaultRouteError implements RouteError {
+ private final RouteError.Phase phase;
+ private final Throwable throwable;
+
+ public DefaultRouteError(Phase phase, Throwable throwable) {
+ this.phase = phase;
+ this.throwable = throwable;
+ }
+
+ @Override
+ public Phase getPhase() {
+ return phase;
+ }
+
+ @Override
+ public Throwable getException() {
+ return throwable;
+ }
+
+ @Override
+ public String toString() {
+ return "DefaultRouteError{"
+ + "phase=" + phase
+ + ", throwable=" + throwable
+ + '}';
+ }
+
+ // ***********************************
+ // Helpers
+ // ***********************************
+
+ public static void set(CamelContext context, String routeId, RouteError.Phase phase, Throwable throwable) {
+ Route route = context.getRoute(routeId);
+ if (route != null) {
+ route.getRouteContext().setLastError(new DefaultRouteError(phase, throwable));
+ }
+ }
+
+ public static void reset(CamelContext context, String routeId) {
+ Route route = context.getRoute(routeId);
+ if (route != null) {
+ route.getRouteContext().setLastError(null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
index 634f4cd..eb731da 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
@@ -306,6 +306,7 @@ public class RouteService extends ChildServiceSupport {
routePolicy.onRemove(route);
}
}
+
// fire event
EventHelper.notifyRouteRemoved(camelContext, route);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
new file mode 100644
index 0000000..1f92653
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/SupervisingRouteController.java
@@ -0,0 +1,556 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EventObject;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.Route;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.StartupListener;
+import org.apache.camel.management.event.CamelContextStartedEvent;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.RouteController;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.spi.RoutePolicyFactory;
+import org.apache.camel.support.EventNotifierSupport;
+import org.apache.camel.util.backoff.BackOff;
+import org.apache.camel.util.backoff.BackOffContext;
+import org.apache.camel.util.backoff.BackOffTimer;
+import org.apache.camel.util.function.ThrowingConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple implementation of the {@link RouteController} that delays the startup
+ * of the routes after the camel context startup and retries to start failing routes.
+ *
+ * NOTE: this is experimental/unstable.
+ */
+@Experimental
+public class SupervisingRouteController extends DefaultRouteController {
+ private static final Logger LOGGER = LoggerFactory.getLogger(SupervisingRouteController.class);
+ private final Object lock;
+ private final AtomicBoolean contextStarted;
+ private final Set<Route> startedRoutes;
+ private final Set<Route> stoppedRoutes;
+ private final CamelContextStartupListener listener;
+ private final RouteManager routeManager;
+ private BackOffTimer timer;
+ private ScheduledExecutorService executorService;
+ private BackOff defaultBackOff;
+ private Map<String, BackOff> backOffConfigurations;
+
+ public SupervisingRouteController() {
+ final Comparator<Route> comparator = Comparator.comparing(
+ route -> Optional.ofNullable(route.getRouteContext().getRoute().getStartupOrder()).orElse(Integer.MIN_VALUE)
+ );
+
+ this.lock = new Object();
+ this.contextStarted = new AtomicBoolean(false);
+ this.stoppedRoutes = new TreeSet<>(comparator);
+ this.startedRoutes = new TreeSet<>(comparator.reversed());
+ this.routeManager = new RouteManager();
+ this.defaultBackOff = BackOff.builder().build();
+ this.backOffConfigurations = new HashMap<>();
+
+ try {
+ this.listener = new CamelContextStartupListener();
+ this.listener.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // *********************************
+ // Properties
+ // *********************************
+
+ public BackOff getDefaultBackOff() {
+ return defaultBackOff;
+ }
+
+ public void setDefaultBackOff(BackOff defaultBackOff) {
+ this.defaultBackOff = defaultBackOff;
+ }
+
+ public Map<String, BackOff> getBackOffConfigurations() {
+ return backOffConfigurations;
+ }
+
+ public void setBackOffConfigurations(Map<String, BackOff> backOffConfigurations) {
+ this.backOffConfigurations = backOffConfigurations;
+ }
+
+ public BackOff getBackOff(String id) {
+ return backOffConfigurations.getOrDefault(id, defaultBackOff);
+ }
+
+ public void setBackOff(String id, BackOff backOff) {
+ backOffConfigurations.put(id, backOff);
+ }
+
+ // *********************************
+ // Lifecycle
+ // *********************************
+
+ @Override
+ protected void doStart() throws Exception {
+ final CamelContext context = getCamelContext();
+
+ context.setAutoStartup(false);
+ context.addRoutePolicyFactory(new ManagedRoutePolicyFactory());
+ context.addStartupListener(this.listener);
+ context.getManagementStrategy().addEventNotifier(this.listener);
+
+ executorService = context.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "SupervisingRouteController");
+ timer = new BackOffTimer(executorService);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (getCamelContext() != null && executorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdown(executorService);
+ executorService = null;
+ timer = null;
+ }
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ if (getCamelContext() != null) {
+ getCamelContext().getManagementStrategy().removeEventNotifier(listener);
+ }
+ }
+
+ // *********************************
+ // Route management
+ // *********************************
+
+ @Override
+ public void startRoute(String routeId) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+
+ if (route == null) {
+ return;
+ }
+
+ doStartRoute(context, route, true, r -> super.startRoute(routeId));
+ }
+
+ @Override
+ public void stopRoute(String routeId) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+
+ if (route == null) {
+ return;
+ }
+
+ doStopRoute(context, route, true, r -> super.stopRoute(routeId));
+ }
+
+ @Override
+ public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+
+ if (route == null) {
+ return;
+ }
+
+ doStopRoute(context, route, true, r -> super.stopRoute(r.getId(), timeout, timeUnit));
+ }
+
+ @Override
+ public boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+ final AtomicBoolean result = new AtomicBoolean(false);
+
+ if (route == null) {
+ return false;
+ }
+
+ doStopRoute(context, route, true, r -> result.set(super.stopRoute(r.getId(), timeout, timeUnit, abortAfterTimeout)));
+
+ return result.get();
+ }
+
+ @Override
+ public void suspendRoute(String routeId) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+
+ if (route == null) {
+ return;
+ }
+
+ doStopRoute(context, route, true, r -> super.suspendRoute(r.getId()));
+ }
+
+ @Override
+ public void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+
+ if (route == null) {
+ return;
+ }
+
+ doStopRoute(context, route, true, r -> super.suspendRoute(r.getId(), timeout, timeUnit));
+ }
+
+ @Override
+ public void resumeRoute(String routeId) throws Exception {
+ final CamelContext context = getCamelContext();
+ final Route route = context.getRoute(routeId);
+
+ if (route == null) {
+ return;
+ }
+
+ doStartRoute(context, route, true, r -> super.startRoute(routeId));
+ }
+
+ // *********************************
+ // Helpers
+ // *********************************
+
+ private void doStopRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route, Exception> consumer) throws Exception {
+ synchronized (lock) {
+ if (checker) {
+ // remove them from checked routes so they don't get started by the
+ // routes check task as a manual operation on the routes indicates that
+ // the route is then managed manually
+ routeManager.release(route);
+ }
+
+ ServiceStatus status = context.getRouteStatus(route.getId());
+ if (!status.isStoppable()) {
+ LOGGER.debug("Route {} status is {}, skipping", route.getId(), status);
+ return;
+ }
+
+ consumer.accept(route);
+
+ startedRoutes.remove(route);
+ stoppedRoutes.add(route);
+
+ // Mark the route as un-managed
+ route.getRouteContext().setRouteController(null);
+ }
+ }
+
+ private void doStartRoute(CamelContext context, Route route, boolean checker, ThrowingConsumer<Route, Exception> consumer) throws Exception {
+ synchronized (lock) {
+ ServiceStatus status = context.getRouteStatus(route.getId());
+ if (!status.isStartable()) {
+ LOGGER.debug("Route {} status is {}, skipping", route.getId(), status);
+ return;
+ }
+
+ try {
+ // remove the route from any queue
+ stoppedRoutes.remove(route);
+ startedRoutes.remove(route);
+
+ if (checker) {
+ routeManager.release(route);
+ }
+
+ // Mark the route as managed
+ route.getRouteContext().setRouteController(this);
+
+ consumer.accept(route);
+
+ // route started successfully
+ startedRoutes.add(route);
+ } catch (Exception e) {
+
+ if (checker) {
+ // if start fails the route is moved to controller supervision
+ // so its get (eventually) restarted
+ routeManager.start(route);
+ }
+
+ throw e;
+ }
+ }
+ }
+
+ private void startRoutes() {
+ if (!isRunAllowed()) {
+ return;
+ }
+
+ List<String> routes;
+
+ synchronized (lock) {
+ routes = stoppedRoutes.stream().map(Route::getId).collect(Collectors.toList());
+ }
+
+ for (String route: routes) {
+ try {
+ startRoute(route);
+ } catch (Exception e) {
+ // ignored, exception handled by startRoute
+ }
+ }
+ }
+
+ private synchronized void stopRoutes() {
+ List<String> routes;
+
+ synchronized (lock) {
+ routes = startedRoutes.stream().map(Route::getId).collect(Collectors.toList());
+ }
+
+ for (String route: routes) {
+ try {
+ stopRoute(route);
+ } catch (Exception e) {
+ // ignored, exception handled by stopRoute
+ }
+ }
+ }
+
+ // *********************************
+ // RouteChecker
+ // *********************************
+
+ private class RouteManager {
+ private final Logger logger;
+ private final ConcurrentMap<Route, CompletableFuture<BackOffContext>> routes;
+
+ RouteManager() {
+ this.logger = LoggerFactory.getLogger(RouteManager.class);
+ this.routes = new ConcurrentHashMap<>();
+ }
+
+ void start(Route route) {
+ route.getRouteContext().setRouteController(SupervisingRouteController.this);
+
+ final CamelContext camelContext = getCamelContext();
+
+ routes.computeIfAbsent(
+ route,
+ r -> {
+ BackOff backOff = getBackOff(r.getId());
+
+ logger.info("Start supervising route: {} with back-off: {}", r.getId(), backOff);
+
+ // Return this future as cancel does not have effect on the
+ // computation (future chain)
+ CompletableFuture<BackOffContext> future = timer.schedule(backOff, context -> {
+ try {
+ logger.info("Try to restart route: {}", r.getId());
+
+ doStartRoute(camelContext, r, false, rx -> SupervisingRouteController.super.startRoute(rx.getId()));
+ return false;
+ } catch (Exception e) {
+ return true;
+ }
+ });
+
+ future.whenComplete((context, throwable) -> {
+ if (context == null || context.isExhausted()) {
+ // This indicates that the future has been cancelled
+ // or that back-off retry is exhausted thus if the
+ // route is not started it is moved out of the supervisor.
+
+ if (context != null && context.isExhausted()) {
+ LOGGER.info("Back-off for route {} is exhausted, no more attempts will be made", route.getId());
+ }
+
+ synchronized (lock) {
+ ServiceStatus status = camelContext.getRouteStatus(route.getId());
+
+ if (status.isStopped() || status.isStopping()) {
+ LOGGER.info("Route {} has status {}, stop supervising it", route.getId(), status);
+
+ r.getRouteContext().setRouteController(null);
+ stoppedRoutes.add(r);
+ } else if (status.isStarted() || status.isStarting()) {
+ synchronized (lock) {
+ startedRoutes.add(r);
+ }
+ }
+ }
+ }
+
+ routes.remove(r);
+ });
+
+ return future;
+ }
+ );
+ }
+
+ boolean release(Route route) {
+ CompletableFuture<BackOffContext> future = routes.remove(route);
+ if (future != null) {
+ future.cancel(true);
+ }
+
+ return future != null;
+ }
+
+ void clear() {
+ routes.forEach((k, v) -> v.cancel(true));
+ routes.clear();
+ }
+
+ boolean isSupervising(Route route) {
+ return routes.containsKey(route);
+ }
+
+ Collection<Route> routes() {
+ return routes.keySet();
+ }
+ }
+
+ private boolean isSupervising(Route route) {
+ synchronized (lock) {
+ return stoppedRoutes.contains(route) || startedRoutes.contains(route) || routeManager.isSupervising(route);
+ }
+ }
+
+ // *********************************
+ // Policies
+ // *********************************
+
+ private class ManagedRoutePolicyFactory implements RoutePolicyFactory {
+ private final RoutePolicy policy = new ManagedRoutePolicy();
+
+ @Override
+ public RoutePolicy createRoutePolicy(CamelContext camelContext, String routeId, RouteDefinition route) {
+ return policy;
+ }
+ }
+
+ private class ManagedRoutePolicy implements RoutePolicy {
+ @Override
+ public void onInit(Route route) {
+ route.getRouteContext().setRouteController(SupervisingRouteController.this);
+ route.getRouteContext().getRoute().setAutoStartup("false");
+
+ if (contextStarted.get()) {
+ LOGGER.debug("Context is started: add route {} to startable routes", route.getId());
+ try {
+ SupervisingRouteController.this.doStartRoute(
+ getCamelContext(),
+ route,
+ true,
+ r -> SupervisingRouteController.super.startRoute(r.getId())
+ );
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ } else {
+ LOGGER.debug("Context is not started: add route {} to stopped routes", route.getId());
+ stoppedRoutes.add(route);
+ }
+ }
+
+ @Override
+ public void onRemove(Route route) {
+ }
+
+ @Override
+ public void onStart(Route route) {
+ }
+
+ @Override
+ public void onStop(Route route) {
+ }
+
+ @Override
+ public void onSuspend(Route route) {
+ }
+
+ @Override
+ public void onResume(Route route) {
+ }
+
+ @Override
+ public void onExchangeBegin(Route route, Exchange exchange) {
+ // NO-OP
+ }
+
+ @Override
+ public void onExchangeDone(Route route, Exchange exchange) {
+ // NO-OP
+ }
+ }
+
+ private class CamelContextStartupListener extends EventNotifierSupport implements StartupListener {
+ @Override
+ public void notify(EventObject event) throws Exception {
+ onCamelContextStarted();
+ }
+
+ @Override
+ public boolean isEnabled(EventObject event) {
+ return event instanceof CamelContextStartedEvent;
+ }
+
+ @Override
+ public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
+ if (alreadyStarted) {
+ // Invoke it only if the context was already started as this
+ // method is not invoked at last event as documented but after
+ // routes warm-up so this is useful for routes deployed after
+ // the camel context has been started-up. For standard routes
+ // configuration the notification of the camel context started
+ // is provided by EventNotifier.
+ //
+ // We should check why this callback is not invoked at latest
+ // stage, or maybe rename it as it is misleading and provide a
+ // better alternative for intercept camel events.
+ onCamelContextStarted();
+ }
+ }
+
+ private void onCamelContextStarted() {
+ // Start managing the routes only when the camel context is started
+ // so start/stop of managed routes do not clash with CamelContext
+ // startup
+ if (contextStarted.compareAndSet(false, true)) {
+ startRoutes();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index a977350..42e7fa6 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -49,6 +49,7 @@ import org.apache.camel.model.ModelHelper;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.spi.RouteError;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.XmlLineNumberParser;
@@ -215,28 +216,28 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
}
- context.startRoute(getRouteId());
+ context.getRouteController().startRoute(getRouteId());
}
public void stop() throws Exception {
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
}
- context.stopRoute(getRouteId());
+ context.getRouteController().stopRoute(getRouteId());
}
public void stop(long timeout) throws Exception {
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
}
- context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS);
+ context.getRouteController().stopRoute(getRouteId(), timeout, TimeUnit.SECONDS);
}
public boolean stop(Long timeout, Boolean abortAfterTimeout) throws Exception {
if (!context.getStatus().isStarted()) {
throw new IllegalArgumentException("CamelContext is not started");
}
- return context.stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout);
+ return context.getRouteController().stopRoute(getRouteId(), timeout, TimeUnit.SECONDS, abortAfterTimeout);
}
public void shutdown() throws Exception {
@@ -481,6 +482,16 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
}
}
+ @Override
+ public Boolean getHasRouteController() {
+ return route.getRouteContext().getRouteController() != null;
+ }
+
+ @Override
+ public RouteError getLastError() {
+ return route.getRouteContext().getLastError();
+ }
+
/**
* Used for sorting the processor mbeans accordingly to their index.
*/
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java b/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
index 54e6ad0..a8cad9c 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointAware;
+import org.apache.camel.Experimental;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeConfiguration;
import org.apache.camel.model.FromDefinition;
@@ -192,4 +193,39 @@ public interface RouteContext extends RuntimeConfiguration, EndpointAware {
*/
int getAndIncrement(ProcessorDefinition<?> node);
+ /**
+ * Gets the last error.
+ *
+ * @return the error
+ */
+ default RouteError getLastError() {
+ return null;
+ }
+
+ /**
+ * Sets the last error.
+ *
+ * @param error the error
+ */
+ default void setLastError(RouteError error) {
+ }
+
+ /**
+ * Gets the {@link RouteController} for this route.
+ *
+ * @return the route controller,
+ */
+ @Experimental
+ default RouteController getRouteController() {
+ return null;
+ }
+
+ /**
+ * Sets the {@link RouteController} for this route.
+ *
+ * @param controller the RouteController
+ */
+ @Experimental
+ default void setRouteController(RouteController controller) {
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/spi/RouteController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteController.java b/camel-core/src/main/java/org/apache/camel/spi/RouteController.java
new file mode 100644
index 0000000..0f25a2d
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/RouteController.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Experimental;
+import org.apache.camel.Service;
+
+@Experimental
+public interface RouteController extends CamelContextAware, Service {
+
+ void startRoute(String routeId) throws Exception;
+
+ void stopRoute(String routeId) throws Exception;
+
+ void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception;
+
+ boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception;
+
+ void suspendRoute(String routeId) throws Exception;
+
+ void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception;
+
+ void resumeRoute(String routeId) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/spi/RouteError.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/RouteError.java b/camel-core/src/main/java/org/apache/camel/spi/RouteError.java
new file mode 100644
index 0000000..db420f4
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/RouteError.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+public interface RouteError {
+ enum Phase {
+ START,
+ STOP,
+ SUSPEND,
+ RESUME,
+ SHUTDOWN,
+ REMOVE
+ }
+
+ /**
+ * Gets the phase associated with the error.
+ *
+ * @return the phase.
+ */
+ Phase getPhase();
+
+ /**
+ * Gets the error.
+ *
+ * @return the error.
+ */
+ Throwable getException();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
index ba9b02f..a1dc3da 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
@@ -423,6 +423,21 @@ public final class ObjectHelper {
}
/**
+ * Tests whether the value is <tt>null</tt>, an empty string, an empty collection or a map
+ *
+ * @param value the value, if its a String it will be tested for text length as well
+ * @param supplier the supplier, the supplier to be used to get a value if value is null
+ */
+ public static <T> T supplyIfEmpty(T value, Supplier<T> supplier) {
+ ObjectHelper.notNull(supplier, "Supplier");
+ if (isNotEmpty(value)) {
+ return value;
+ }
+
+ return supplier.get();
+ }
+
+ /**
* Tests whether the value is <b>not</b> <tt>null</tt>, an empty string, an empty collection or a map
*
* @param value the value, if its a String it will be tested for text length as well
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java
new file mode 100644
index 0000000..1e3d326
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOff.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.backoff;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.util.ObjectHelper;
+
+public final class BackOff {
+ public static final long NEVER = -1L;
+ public static final Duration MAX_DURATION = Duration.ofMillis(Long.MAX_VALUE);
+ public static final Duration DEFAULT_DELAY = Duration.ofSeconds(2);
+ public static final double DEFAULT_MULTIPLIER = 1f;
+
+ private Duration delay;
+ private Duration maxDelay;
+ private Duration maxElapsedTime;
+ private Long maxAttempts;
+ private Double multiplier;
+
+ public BackOff() {
+ this(DEFAULT_DELAY, MAX_DURATION, MAX_DURATION, Long.MAX_VALUE, DEFAULT_MULTIPLIER);
+ }
+
+ public BackOff(Duration delay, Duration maxDelay, Duration maxElapsedTime, Long maxAttempts, Double multiplier) {
+ this.delay = ObjectHelper.supplyIfEmpty(delay, () -> DEFAULT_DELAY);
+ this.maxDelay = ObjectHelper.supplyIfEmpty(maxDelay, () -> MAX_DURATION);
+ this.maxElapsedTime = ObjectHelper.supplyIfEmpty(maxElapsedTime, () -> MAX_DURATION);
+ this.maxAttempts = ObjectHelper.supplyIfEmpty(maxAttempts, () -> Long.MAX_VALUE);
+ this.multiplier = ObjectHelper.supplyIfEmpty(multiplier, () -> DEFAULT_MULTIPLIER);
+ }
+
+ // *************************************
+ // Properties
+ // *************************************
+
+ public Duration getDelay() {
+ return delay;
+ }
+
+ public void setDelay(Duration delay) {
+ this.delay = delay;
+ }
+
+ public Duration getMaxDelay() {
+ return maxDelay;
+ }
+
+ public void setMaxDelay(Duration maxDelay) {
+ this.maxDelay = maxDelay;
+ }
+
+ public Duration getMaxElapsedTime() {
+ return maxElapsedTime;
+ }
+
+ public void setMaxElapsedTime(Duration maxElapsedTime) {
+ this.maxElapsedTime = maxElapsedTime;
+ }
+
+ public Long getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public void setMaxAttempts(Long maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ }
+
+ public Double getMultiplier() {
+ return multiplier;
+ }
+
+ public void setMultiplier(Double multiplier) {
+ this.multiplier = multiplier;
+ }
+
+ @Override
+ public String toString() {
+ return "BackOff{"
+ + "delay=" + delay
+ + ", maxDelay=" + maxDelay
+ + ", maxElapsedTime=" + maxElapsedTime
+ + ", maxAttempts=" + maxAttempts
+ + ", multiplier=" + multiplier
+ + '}';
+ }
+
+ // *****************************************
+ // Builder
+ // *****************************************
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static final class Builder {
+ private Duration delay = BackOff.DEFAULT_DELAY;
+ private Duration maxDelay = BackOff.MAX_DURATION;
+ private Duration maxElapsedTime = BackOff.MAX_DURATION;
+ private Long maxAttempts = Long.MAX_VALUE;
+ private Double multiplier = BackOff.DEFAULT_MULTIPLIER;
+
+ public Builder read(BackOff template) {
+ delay = template.delay;
+ maxDelay = template.maxDelay;
+ maxElapsedTime = template.maxElapsedTime;
+ maxAttempts = template.maxAttempts;
+ multiplier = template.multiplier;
+
+ return this;
+ }
+
+ public Builder delay(Duration delay) {
+ this.delay = delay;
+ return this;
+ }
+
+ public Builder delay(long delay, TimeUnit unit) {
+ return delay(Duration.ofMillis(unit.toMillis(delay)));
+ }
+
+ public Builder delay(long delay) {
+ return delay(Duration.ofMillis(delay));
+ }
+
+ public Builder maxDelay(Duration maxDelay) {
+ this.maxDelay = maxDelay;
+ return this;
+ }
+
+ public Builder maxDelay(long maxDelay, TimeUnit unit) {
+ return maxDelay(Duration.ofMillis(unit.toMillis(maxDelay)));
+ }
+
+ public Builder maxDelay(long maxDelay) {
+ return maxDelay(Duration.ofMillis(maxDelay));
+ }
+
+ public Builder maxElapsedTime(Duration maxElapsedTime) {
+ this.maxElapsedTime = maxElapsedTime;
+ return this;
+ }
+
+ public Builder maxElapsedTime(long maxElapsedTime, TimeUnit unit) {
+ return maxElapsedTime(Duration.ofMillis(unit.toMillis(maxElapsedTime)));
+ }
+
+ public Builder maxElapsedTime(long maxElapsedTime) {
+ return maxElapsedTime(Duration.ofMillis(maxElapsedTime));
+ }
+
+ public Builder maxAttempts(Long attempts) {
+ this.maxAttempts = attempts;
+ return this;
+ }
+
+ public Builder multiplier(Double multiplier) {
+ this.multiplier = multiplier;
+ return this;
+ }
+
+ public BackOff build() {
+ return new BackOff(delay, maxDelay, maxElapsedTime, maxAttempts, multiplier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java
new file mode 100644
index 0000000..502f0ad
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffContext.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.backoff;
+
+
+public final class BackOffContext {
+ private final BackOff backOff;
+
+ private long currentAttempts;
+ private long currentDelay;
+ private long currentElapsedTime;
+
+ public BackOffContext(BackOff backOff) {
+ this.backOff = backOff;
+ this.currentAttempts = 0;
+ this.currentDelay = backOff.getDelay().toMillis();
+ this.currentElapsedTime = 0;
+ }
+
+ // *************************************
+ // Properties
+ // *************************************
+
+ public BackOff backOff() {
+ return backOff;
+ }
+
+ public long getCurrentAttempts() {
+ return currentAttempts;
+ }
+
+ public long getCurrentDelay() {
+ return currentDelay;
+ }
+
+ public long getCurrentElapsedTime() {
+ return currentElapsedTime;
+ }
+
+ public boolean isExhausted() {
+ return currentDelay == BackOff.NEVER;
+ }
+
+ // *************************************
+ // Impl
+ // *************************************
+
+ public long next() {
+ // A call to next when currentDelay is set to NEVER has no effects
+ // as this means that either the timer is exhausted or it has explicit
+ // stopped
+ if (currentDelay != BackOff.NEVER) {
+
+ currentAttempts++;
+
+ if (currentAttempts > backOff.getMaxAttempts()) {
+ currentDelay = BackOff.NEVER;
+ } else if (currentElapsedTime > backOff.getMaxElapsedTime().toMillis()) {
+ currentDelay = BackOff.NEVER;
+ } else {
+ if (currentDelay <= backOff.getMaxDelay().toMillis()) {
+ currentDelay = (long) (currentDelay * backOff().getMultiplier());
+ }
+
+ currentElapsedTime += currentDelay;
+ }
+ }
+
+ return currentDelay;
+ }
+
+ public BackOffContext reset() {
+ this.currentAttempts = 0;
+ this.currentDelay = 0;
+ this.currentElapsedTime = 0;
+
+ return this;
+ }
+
+ public BackOffContext stop() {
+ this.currentAttempts = 0;
+ this.currentDelay = BackOff.NEVER;
+ this.currentElapsedTime = 0;
+
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
new file mode 100644
index 0000000..d82c824
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/util/backoff/BackOffTimer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.backoff;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.util.function.ThrowingFunction;
+
+
+public class BackOffTimer {
+ private final ScheduledExecutorService scheduler;
+
+ public BackOffTimer(ScheduledExecutorService scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public CompletableFuture<BackOffContext> schedule(BackOff backOff, ThrowingFunction<BackOffContext, Boolean, Exception> function) {
+ final BackOffContext context = new BackOffContext(backOff);
+ final Task task = new Task(context, function);
+
+ long delay = context.next();
+ if (delay != BackOff.NEVER) {
+ scheduler.schedule(task, delay, TimeUnit.MILLISECONDS);
+ } else {
+ task.complete();
+ }
+
+ return task;
+ }
+
+ // ****************************************
+ // TimerTask
+ // ****************************************
+
+ private final class Task extends CompletableFuture<BackOffContext> implements Runnable {
+ private final BackOffContext context;
+ private final ThrowingFunction<BackOffContext, Boolean, Exception> function;
+
+ Task(BackOffContext context, ThrowingFunction<BackOffContext, Boolean, Exception> function) {
+ this.context = context;
+ this.function = function;
+ }
+
+ @Override
+ public void run() {
+ if (context.isExhausted() || isDone() || isCancelled()) {
+ if (!isDone()) {
+ complete();
+ }
+
+ return;
+ }
+
+ try {
+ if (function.apply(context)) {
+ long delay = context.next();
+ if (context.isExhausted()) {
+ complete();
+ } else if (!context.isExhausted() && !isDone() && !isCancelled()) {
+ scheduler.schedule(this, delay, TimeUnit.MILLISECONDS);
+ }
+ } else {
+ complete();
+ }
+ } catch (Exception e) {
+ completeExceptionally(e);
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ context.stop();
+
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ boolean complete() {
+ return super.complete(context);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e68111ec/camel-core/src/main/java/org/apache/camel/util/backoff/package.html
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/backoff/package.html b/camel-core/src/main/java/org/apache/camel/util/backoff/package.html
new file mode 100644
index 0000000..1323e4d
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/util/backoff/package.html
@@ -0,0 +1,25 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<html>
+<head>
+</head>
+<body>
+
+Utility classes for BackOff.
+
+</body>
+</html>