You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/11/22 09:58:19 UTC
[camel] 02/02: CAMEL-17215: camel-jbang - Reload should restart max-messages duration
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 671f842ff7017a659dd13baceef94990fe8e95f3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Nov 22 10:57:28 2021 +0100
CAMEL-17215: camel-jbang - Reload should restart max-messages duration
---
.../main/java/org/apache/camel/spi/CamelEvent.java | 8 +++++
.../java/org/apache/camel/spi/EventFactory.java | 8 +++++
.../camel/impl/event/DefaultEventFactory.java | 5 +++
.../camel/impl/event/RouteReloadedEvent.java | 34 +++++++++++++++++++
.../camel/main/MainDurationEventNotifier.java | 32 +++++++++++++++---
.../apache/camel/main/MainShutdownStrategy.java | 7 +++-
.../java/org/apache/camel/main/MainSupport.java | 13 +++++---
.../camel/main/SimpleMainShutdownStrategy.java | 34 ++++++++++++++++---
.../java/org/apache/camel/support/EventHelper.java | 39 ++++++++++++++++++++++
.../camel/support/RouteWatcherReloadStrategy.java | 5 +++
.../ROOT/pages/camel-3x-upgrade-guide-3_14.adoc | 4 +++
.../apache/camel/dsl/jbang/core/commands/Run.java | 21 +++++++++---
.../camel/dsl/jbang/core/common/RuntimeUtil.java | 8 ++---
13 files changed, 196 insertions(+), 22 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
index 1de38c0..6a34fa8 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
@@ -55,6 +55,7 @@ public interface CamelEvent {
RoutesStopped,
RouteAdded,
RouteRemoved,
+ RouteReloaded,
RouteStarting,
RouteStarted,
RouteStopping,
@@ -347,6 +348,13 @@ public interface CamelEvent {
}
}
+ interface RouteReloadedEvent extends RouteEvent {
+ @Override
+ default Type getType() {
+ return Type.RouteReloaded;
+ }
+ }
+
interface RouteStartingEvent extends RouteEvent {
@Override
default Type getType() {
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
index 6cc7994..fa402e9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
@@ -196,6 +196,14 @@ public interface EventFactory {
CamelEvent createRouteRemovedEvent(Route route);
/**
+ * Creates an {@link CamelEvent} for {@link Route} has been reloaded successfully.
+ *
+ * @param route the route
+ * @return the reloaded event
+ */
+ CamelEvent createRouteReloaded(Route route);
+
+ /**
* Creates an {@link CamelEvent} when an {@link org.apache.camel.Exchange} has been created
*
* @param exchange the exchange
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
index 8472242..f04c267 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
@@ -131,6 +131,11 @@ public class DefaultEventFactory implements EventFactory {
}
@Override
+ public CamelEvent createRouteReloaded(Route route) {
+ return new RouteReloadedEvent(route);
+ }
+
+ @Override
public CamelEvent createExchangeCreatedEvent(Exchange exchange) {
return new ExchangeCreatedEvent(exchange);
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteReloadedEvent.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteReloadedEvent.java
new file mode 100644
index 0000000..651bc22
--- /dev/null
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteReloadedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.event;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.CamelEvent;
+
+public class RouteReloadedEvent extends AbstractRouteEvent implements CamelEvent.RouteReloadedEvent {
+
+ private static final long serialVersionUID = 7966471393751298718L;
+
+ public RouteReloadedEvent(Route source) {
+ super(source);
+ }
+
+ @Override
+ public String toString() {
+ return "Reloaded route: " + getRoute().getId();
+ }
+}
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
index bd1ee7b..9e57971 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelEvent.ExchangeCompletedEvent;
import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent;
import org.apache.camel.spi.CamelEvent.ExchangeFailedEvent;
+import org.apache.camel.spi.CamelEvent.RouteReloadedEvent;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
@@ -60,6 +61,14 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
@Override
public void notify(CamelEvent event) throws Exception {
+ try {
+ doNotify(event);
+ } catch (Exception e) {
+ LOG.warn("Error during processing CamelEvent: " + event + ". This exception is ignored.", e);
+ }
+ }
+
+ protected void doNotify(CamelEvent event) throws Exception {
// ignore any event that is received if shutdown is in process
if (!shutdownStrategy.isRunAllowed()) {
return;
@@ -67,13 +76,24 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
boolean begin = event instanceof ExchangeCreatedEvent;
boolean complete = event instanceof ExchangeCompletedEvent || event instanceof ExchangeFailedEvent;
+ boolean reloaded = event instanceof RouteReloadedEvent;
+
+ if (reloaded) {
+ LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds");
+ shutdownStrategy.restartAwait();
+ doneMessages.set(0);
+ if (watch != null) {
+ watch.restart();
+ }
+ return;
+ }
if (maxMessages > 0 && complete) {
boolean result = doneMessages.incrementAndGet() >= maxMessages;
LOG.trace("Duration max messages check {} >= {} -> {}", doneMessages.get(), maxMessages, result);
if (result && shutdownStrategy.isRunAllowed()) {
- LOG.info("Duration max messages triggering shutdown of the JVM.");
+ LOG.info("Duration max messages triggering shutdown of the JVM");
// use thread to stop Camel as otherwise we would block current thread
camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask).start();
}
@@ -81,15 +101,17 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
// idle reacts on both incoming and complete messages
if (maxIdleSeconds > 0 && (begin || complete)) {
- LOG.trace("Message activity so restarting stop watch");
- watch.restart();
+ if (watch != null) {
+ LOG.trace("Message activity so restarting stop watch");
+ watch.restart();
+ }
}
}
@Override
public boolean isEnabled(CamelEvent event) {
return event instanceof ExchangeCreatedEvent || event instanceof ExchangeCompletedEvent
- || event instanceof ExchangeFailedEvent;
+ || event instanceof ExchangeFailedEvent || event instanceof RouteReloadedEvent;
}
@Override
@@ -156,7 +178,7 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
LOG.trace("Duration max idle check {} >= {} -> {}", seconds, maxIdleSeconds, result);
if (result && shutdownStrategy.isRunAllowed()) {
- LOG.info("Duration max idle triggering shutdown of the JVM.");
+ LOG.info("Duration max idle triggering shutdown of the JVM");
// use thread to stop Camel as otherwise we would block current thread
camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask).start();
}
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java b/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java
index 46ab437..084ea6f 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java
@@ -62,11 +62,16 @@ public interface MainShutdownStrategy {
void await() throws InterruptedException;
/**
- * Waiting for Camel Main to complete.
+ * Waiting for Camel Main to complete (with timeout).
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return true if Camel Main was completed before the timeout, false if timeout was triggered.
*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
+
+ /**
+ * This is used for restarting await with timeout.
+ */
+ void restartAwait();
}
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java b/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java
index 86229ca..21b2eb2 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java
@@ -270,9 +270,11 @@ public abstract class MainSupport extends BaseMainSupport {
@Override
protected void configureLifecycle(CamelContext camelContext) throws Exception {
- if (mainConfigurationProperties.getDurationMaxMessages() > 0
+ if (mainConfigurationProperties.getDurationMaxSeconds() > 0
+ || mainConfigurationProperties.getDurationMaxMessages() > 0
|| mainConfigurationProperties.getDurationMaxIdleSeconds() > 0) {
- // register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed
+ // register lifecycle, so we can trigger to shutdown the JVM when maximum number of messages has been processed
+ // (we must use the event notifier also for max seconds only to support restarting duration if routes are reloaded)
EventNotifier notifier = new MainDurationEventNotifier(
camelContext,
mainConfigurationProperties.getDurationMaxMessages(),
@@ -285,7 +287,7 @@ public abstract class MainSupport extends BaseMainSupport {
camelContext.getManagementStrategy().addEventNotifier(notifier);
}
- // register lifecycle so we are notified in Camel is stopped from JMX or somewhere else
+ // register lifecycle, so we are notified in Camel is stopped from JMX or somewhere else
camelContext.addLifecycleStrategy(new MainLifecycleStrategy(shutdownStrategy));
}
@@ -298,7 +300,10 @@ public abstract class MainSupport extends BaseMainSupport {
int exit = durationHitExitCode;
if (sec > 0) {
LOG.info("Waiting until complete: Duration max {} seconds", sec);
- shutdownStrategy.await(sec, TimeUnit.SECONDS);
+ boolean zero = shutdownStrategy.await(sec, TimeUnit.SECONDS);
+ if (!zero) {
+ LOG.info("Duration max seconds triggering shutdown of the JVM");
+ }
exitCode.compareAndSet(UNINITIALIZED_EXIT_CODE, exit);
shutdownStrategy.shutdown();
} else if (idle > 0 || max > 0) {
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java b/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java
index 18b7ae4..3db20f9 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java
@@ -25,12 +25,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Base class for {@link MainShutdownStrategy}.
+ */
public class SimpleMainShutdownStrategy implements MainShutdownStrategy {
protected static final Logger LOG = LoggerFactory.getLogger(SimpleMainShutdownStrategy.class);
private final Set<ShutdownEventListener> listeners = new LinkedHashSet<>();
private final AtomicBoolean completed;
- private final CountDownLatch latch;
+ private final AtomicBoolean timeoutEnabled = new AtomicBoolean();
+ private final AtomicBoolean restarting = new AtomicBoolean();
+ private volatile CountDownLatch latch;
public SimpleMainShutdownStrategy() {
this.completed = new AtomicBoolean();
@@ -50,7 +55,7 @@ public class SimpleMainShutdownStrategy implements MainShutdownStrategy {
@Override
public boolean shutdown() {
if (completed.compareAndSet(false, true)) {
- LOG.debug("Setting shutdown completed state from false to true");
+ LOG.debug("Shutdown called");
latch.countDown();
for (ShutdownEventListener l : listeners) {
try {
@@ -75,7 +80,28 @@ public class SimpleMainShutdownStrategy implements MainShutdownStrategy {
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- LOG.debug("Await shutdown to complete with timeout: {} {}", timeout, unit);
- return latch.await(timeout, unit);
+ timeoutEnabled.set(true);
+
+ while (true) {
+ LOG.debug("Await shutdown to complete with timeout: {} {}", timeout, unit);
+ boolean zero = latch.await(timeout, unit);
+ if (zero && restarting.compareAndSet(true, false)) {
+ // re-create new latch to restart
+ LOG.debug("Restarting await shutdown to complete with timeout: {} {}", timeout, unit);
+ latch = new CountDownLatch(1);
+ } else {
+ return zero;
+ }
+ }
+ }
+
+ @Override
+ public void restartAwait() {
+ // only restart if timeout is in use
+ if (timeoutEnabled.get()) {
+ LOG.trace("Restarting await with timeout");
+ restarting.set(true);
+ latch.countDown();
+ }
}
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
index bf904f9..fcc8f39 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
@@ -462,6 +462,45 @@ public final class EventHelper {
return answer;
}
+ public static boolean notifyRouteReloaded(CamelContext context, Route route) {
+ ManagementStrategy management = context.getManagementStrategy();
+ if (management == null) {
+ return false;
+ }
+
+ EventFactory factory = management.getEventFactory();
+ if (factory == null) {
+ return false;
+ }
+
+ List<EventNotifier> notifiers = management.getStartedEventNotifiers();
+ if (notifiers == null || notifiers.isEmpty()) {
+ return false;
+ }
+
+ boolean answer = false;
+ CamelEvent event = null;
+ for (EventNotifier notifier : notifiers) {
+ if (notifier.isDisabled()) {
+ continue;
+ }
+ if (notifier.isIgnoreRouteEvents()) {
+ continue;
+ }
+
+ if (event == null) {
+ // only create event once
+ event = factory.createRouteReloaded(route);
+ if (event == null) {
+ // factory could not create event so exit
+ return false;
+ }
+ }
+ answer |= doNotifyEvent(notifier, event);
+ }
+ return answer;
+ }
+
public static boolean notifyExchangeCreated(CamelContext context, Exchange exchange) {
ManagementStrategy management = context.getManagementStrategy();
if (management == null) {
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
index ddcc77d..7007540 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
@@ -127,6 +127,11 @@ public class RouteWatcherReloadStrategy extends FileWatcherResourceReloadStrateg
if (!ids.isEmpty()) {
LOG.info("Reloaded routes: {}", String.join(", ", ids));
}
+ // fire events for routes reloaded
+ for (String id : ids) {
+ Route route = getCamelContext().getRoute(id);
+ EventHelper.notifyRouteReloaded(getCamelContext(), route);
+ }
if (!removeAllRoutes) {
// if not all previous routes are removed then to have safe route reloading
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
index b44e8ab..cc637bc 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
@@ -10,3 +10,7 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
Added method `updateRoutesToCamelContext` to `org.apache.camel.RoutesBuilder` interface.
+=== camel-jbang
+
+The option `debug-level` has been renamed to `logging-level` because the option is for configuring the logging level.
+
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java
index 6aeed9b..528e05e 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java
@@ -46,8 +46,8 @@ class Run implements Callable<Integer> {
private boolean helpRequested;
//CHECKSTYLE:ON
- @Option(names = { "--debug-level" }, defaultValue = "info", description = "Default debug level")
- private String debugLevel;
+ @Option(names = { "--logging-level" }, defaultValue = "info", description = "Logging level")
+ private String loggingLevel;
@Option(names = { "--stop" }, description = "Stop all running instances of Camel JBang")
private boolean stopRequested;
@@ -55,6 +55,13 @@ class Run implements Callable<Integer> {
@Option(names = { "--max-messages" }, defaultValue = "0", description = "Max number of messages to process before stopping")
private int maxMessages;
+ @Option(names = { "--max-seconds" }, defaultValue = "0", description = "Max seconds to run before stopping")
+ private int maxSeconds;
+
+ @Option(names = { "--max-idle-seconds" }, defaultValue = "0",
+ description = "For how long time in seconds Camel can be idle before stopping")
+ private int maxIdleSeconds;
+
@Option(names = { "--reload" }, description = "Enables live reload when source file is changed (saved)")
private boolean reload;
@@ -96,11 +103,17 @@ class Run implements Callable<Integer> {
if (maxMessages > 0) {
main.addInitialProperty("camel.main.durationMaxMessages", String.valueOf(maxMessages));
}
+ if (maxSeconds > 0) {
+ main.addInitialProperty("camel.main.durationMaxSeconds", String.valueOf(maxSeconds));
+ }
+ if (maxIdleSeconds > 0) {
+ main.addInitialProperty("camel.main.durationMaxIdleSeconds", String.valueOf(maxIdleSeconds));
+ }
- RuntimeUtil.configureLog(debugLevel);
+ RuntimeUtil.configureLog(loggingLevel);
// shutdown quickly
- main.addInitialProperty("camel.main.shutdown-timeout", "5");
+ main.addInitialProperty("camel.main.shutdownTimeout", "5");
// turn off lightweight if we have routes reload enabled
main.addInitialProperty("camel.main.routesReloadEnabled", reload ? "true" : "false");
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java
index ae4f4a3..bd9eeb4 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java
@@ -30,10 +30,10 @@ public final class RuntimeUtil {
private RuntimeUtil() {
}
- public static void configureLog(String debugLevel) {
- debugLevel = debugLevel.toLowerCase();
+ public static void configureLog(String level) {
+ level = level.toLowerCase();
- switch (debugLevel) {
+ switch (level) {
case "trace":
Configurator.setRootLevel(Level.TRACE);
break;
@@ -54,7 +54,7 @@ public final class RuntimeUtil {
break;
default: {
Configurator.setRootLevel(Level.INFO);
- LoggerFactory.getLogger(RuntimeUtil.class).warn("Invalid debug level: {}", debugLevel);
+ LoggerFactory.getLogger(RuntimeUtil.class).warn("Invalid logging level: {}", level);
}
}
}