You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/11/22 09:58:19 UTC

[camel] 02/02: CAMEL-17215: camel-jbang - Reload should restart max-messages duration

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 671f842ff7017a659dd13baceef94990fe8e95f3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Nov 22 10:57:28 2021 +0100

    CAMEL-17215: camel-jbang - Reload should restart max-messages duration
---
 .../main/java/org/apache/camel/spi/CamelEvent.java |  8 +++++
 .../java/org/apache/camel/spi/EventFactory.java    |  8 +++++
 .../camel/impl/event/DefaultEventFactory.java      |  5 +++
 .../camel/impl/event/RouteReloadedEvent.java       | 34 +++++++++++++++++++
 .../camel/main/MainDurationEventNotifier.java      | 32 +++++++++++++++---
 .../apache/camel/main/MainShutdownStrategy.java    |  7 +++-
 .../java/org/apache/camel/main/MainSupport.java    | 13 +++++---
 .../camel/main/SimpleMainShutdownStrategy.java     | 34 ++++++++++++++++---
 .../java/org/apache/camel/support/EventHelper.java | 39 ++++++++++++++++++++++
 .../camel/support/RouteWatcherReloadStrategy.java  |  5 +++
 .../ROOT/pages/camel-3x-upgrade-guide-3_14.adoc    |  4 +++
 .../apache/camel/dsl/jbang/core/commands/Run.java  | 21 +++++++++---
 .../camel/dsl/jbang/core/common/RuntimeUtil.java   |  8 ++---
 13 files changed, 196 insertions(+), 22 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
index 1de38c0..6a34fa8 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/CamelEvent.java
@@ -55,6 +55,7 @@ public interface CamelEvent {
         RoutesStopped,
         RouteAdded,
         RouteRemoved,
+        RouteReloaded,
         RouteStarting,
         RouteStarted,
         RouteStopping,
@@ -347,6 +348,13 @@ public interface CamelEvent {
         }
     }
 
+    interface RouteReloadedEvent extends RouteEvent {
+        @Override
+        default Type getType() {
+            return Type.RouteReloaded;
+        }
+    }
+
     interface RouteStartingEvent extends RouteEvent {
         @Override
         default Type getType() {
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
index 6cc7994..fa402e9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/EventFactory.java
@@ -196,6 +196,14 @@ public interface EventFactory {
     CamelEvent createRouteRemovedEvent(Route route);
 
     /**
+     * Creates an {@link CamelEvent} for {@link Route} has been reloaded successfully.
+     *
+     * @param  route the route
+     * @return       the reloaded event
+     */
+    CamelEvent createRouteReloaded(Route route);
+
+    /**
      * Creates an {@link CamelEvent} when an {@link org.apache.camel.Exchange} has been created
      *
      * @param  exchange the exchange
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
index 8472242..f04c267 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/DefaultEventFactory.java
@@ -131,6 +131,11 @@ public class DefaultEventFactory implements EventFactory {
     }
 
     @Override
+    public CamelEvent createRouteReloaded(Route route) {
+        return new RouteReloadedEvent(route);
+    }
+
+    @Override
     public CamelEvent createExchangeCreatedEvent(Exchange exchange) {
         return new ExchangeCreatedEvent(exchange);
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteReloadedEvent.java b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteReloadedEvent.java
new file mode 100644
index 0000000..651bc22
--- /dev/null
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/event/RouteReloadedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.event;
+
+import org.apache.camel.Route;
+import org.apache.camel.spi.CamelEvent;
+
+public class RouteReloadedEvent extends AbstractRouteEvent implements CamelEvent.RouteReloadedEvent {
+
+    private static final long serialVersionUID = 7966471393751298718L;
+
+    public RouteReloadedEvent(Route source) {
+        super(source);
+    }
+
+    @Override
+    public String toString() {
+        return "Reloaded route: " + getRoute().getId();
+    }
+}
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
index bd1ee7b..9e57971 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
@@ -26,6 +26,7 @@ import org.apache.camel.spi.CamelEvent;
 import org.apache.camel.spi.CamelEvent.ExchangeCompletedEvent;
 import org.apache.camel.spi.CamelEvent.ExchangeCreatedEvent;
 import org.apache.camel.spi.CamelEvent.ExchangeFailedEvent;
+import org.apache.camel.spi.CamelEvent.RouteReloadedEvent;
 import org.apache.camel.support.EventNotifierSupport;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
@@ -60,6 +61,14 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
 
     @Override
     public void notify(CamelEvent event) throws Exception {
+        try {
+            doNotify(event);
+        } catch (Exception e) {
+            LOG.warn("Error during processing CamelEvent: " + event + ". This exception is ignored.", e);
+        }
+    }
+
+    protected void doNotify(CamelEvent event) throws Exception {
         // ignore any event that is received if shutdown is in process
         if (!shutdownStrategy.isRunAllowed()) {
             return;
@@ -67,13 +76,24 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
 
         boolean begin = event instanceof ExchangeCreatedEvent;
         boolean complete = event instanceof ExchangeCompletedEvent || event instanceof ExchangeFailedEvent;
+        boolean reloaded = event instanceof RouteReloadedEvent;
+
+        if (reloaded) {
+            LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds");
+            shutdownStrategy.restartAwait();
+            doneMessages.set(0);
+            if (watch != null) {
+                watch.restart();
+            }
+            return;
+        }
 
         if (maxMessages > 0 && complete) {
             boolean result = doneMessages.incrementAndGet() >= maxMessages;
             LOG.trace("Duration max messages check {} >= {} -> {}", doneMessages.get(), maxMessages, result);
 
             if (result && shutdownStrategy.isRunAllowed()) {
-                LOG.info("Duration max messages triggering shutdown of the JVM.");
+                LOG.info("Duration max messages triggering shutdown of the JVM");
                 // use thread to stop Camel as otherwise we would block current thread
                 camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask).start();
             }
@@ -81,15 +101,17 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
 
         // idle reacts on both incoming and complete messages
         if (maxIdleSeconds > 0 && (begin || complete)) {
-            LOG.trace("Message activity so restarting stop watch");
-            watch.restart();
+            if (watch != null) {
+                LOG.trace("Message activity so restarting stop watch");
+                watch.restart();
+            }
         }
     }
 
     @Override
     public boolean isEnabled(CamelEvent event) {
         return event instanceof ExchangeCreatedEvent || event instanceof ExchangeCompletedEvent
-                || event instanceof ExchangeFailedEvent;
+                || event instanceof ExchangeFailedEvent || event instanceof RouteReloadedEvent;
     }
 
     @Override
@@ -156,7 +178,7 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
         LOG.trace("Duration max idle check {} >= {} -> {}", seconds, maxIdleSeconds, result);
 
         if (result && shutdownStrategy.isRunAllowed()) {
-            LOG.info("Duration max idle triggering shutdown of the JVM.");
+            LOG.info("Duration max idle triggering shutdown of the JVM");
             // use thread to stop Camel as otherwise we would block current thread
             camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask).start();
         }
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java b/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java
index 46ab437..084ea6f 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainShutdownStrategy.java
@@ -62,11 +62,16 @@ public interface MainShutdownStrategy {
     void await() throws InterruptedException;
 
     /**
-     * Waiting for Camel Main to complete.
+     * Waiting for Camel Main to complete (with timeout).
      *
      * @param  timeout the maximum time to wait
      * @param  unit    the time unit of the {@code timeout} argument
      * @return         true if Camel Main was completed before the timeout, false if timeout was triggered.
      */
     boolean await(long timeout, TimeUnit unit) throws InterruptedException;
+
+    /**
+     * This is used for restarting await with timeout.
+     */
+    void restartAwait();
 }
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java b/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java
index 86229ca..21b2eb2 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainSupport.java
@@ -270,9 +270,11 @@ public abstract class MainSupport extends BaseMainSupport {
 
     @Override
     protected void configureLifecycle(CamelContext camelContext) throws Exception {
-        if (mainConfigurationProperties.getDurationMaxMessages() > 0
+        if (mainConfigurationProperties.getDurationMaxSeconds() > 0
+                || mainConfigurationProperties.getDurationMaxMessages() > 0
                 || mainConfigurationProperties.getDurationMaxIdleSeconds() > 0) {
-            // register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed
+            // register lifecycle, so we can trigger to shutdown the JVM when maximum number of messages has been processed
+            // (we must use the event notifier also for max seconds only to support restarting duration if routes are reloaded)
             EventNotifier notifier = new MainDurationEventNotifier(
                     camelContext,
                     mainConfigurationProperties.getDurationMaxMessages(),
@@ -285,7 +287,7 @@ public abstract class MainSupport extends BaseMainSupport {
             camelContext.getManagementStrategy().addEventNotifier(notifier);
         }
 
-        // register lifecycle so we are notified in Camel is stopped from JMX or somewhere else
+        // register lifecycle, so we are notified in Camel is stopped from JMX or somewhere else
         camelContext.addLifecycleStrategy(new MainLifecycleStrategy(shutdownStrategy));
     }
 
@@ -298,7 +300,10 @@ public abstract class MainSupport extends BaseMainSupport {
                 int exit = durationHitExitCode;
                 if (sec > 0) {
                     LOG.info("Waiting until complete: Duration max {} seconds", sec);
-                    shutdownStrategy.await(sec, TimeUnit.SECONDS);
+                    boolean zero = shutdownStrategy.await(sec, TimeUnit.SECONDS);
+                    if (!zero) {
+                        LOG.info("Duration max seconds triggering shutdown of the JVM");
+                    }
                     exitCode.compareAndSet(UNINITIALIZED_EXIT_CODE, exit);
                     shutdownStrategy.shutdown();
                 } else if (idle > 0 || max > 0) {
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java b/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java
index 18b7ae4..3db20f9 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/SimpleMainShutdownStrategy.java
@@ -25,12 +25,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Base class for {@link MainShutdownStrategy}.
+ */
 public class SimpleMainShutdownStrategy implements MainShutdownStrategy {
     protected static final Logger LOG = LoggerFactory.getLogger(SimpleMainShutdownStrategy.class);
 
     private final Set<ShutdownEventListener> listeners = new LinkedHashSet<>();
     private final AtomicBoolean completed;
-    private final CountDownLatch latch;
+    private final AtomicBoolean timeoutEnabled = new AtomicBoolean();
+    private final AtomicBoolean restarting = new AtomicBoolean();
+    private volatile CountDownLatch latch;
 
     public SimpleMainShutdownStrategy() {
         this.completed = new AtomicBoolean();
@@ -50,7 +55,7 @@ public class SimpleMainShutdownStrategy implements MainShutdownStrategy {
     @Override
     public boolean shutdown() {
         if (completed.compareAndSet(false, true)) {
-            LOG.debug("Setting shutdown completed state from false to true");
+            LOG.debug("Shutdown called");
             latch.countDown();
             for (ShutdownEventListener l : listeners) {
                 try {
@@ -75,7 +80,28 @@ public class SimpleMainShutdownStrategy implements MainShutdownStrategy {
 
     @Override
     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
-        LOG.debug("Await shutdown to complete with timeout: {} {}", timeout, unit);
-        return latch.await(timeout, unit);
+        timeoutEnabled.set(true);
+
+        while (true) {
+            LOG.debug("Await shutdown to complete with timeout: {} {}", timeout, unit);
+            boolean zero = latch.await(timeout, unit);
+            if (zero && restarting.compareAndSet(true, false)) {
+                // re-create new latch to restart
+                LOG.debug("Restarting await shutdown to complete with timeout: {} {}", timeout, unit);
+                latch = new CountDownLatch(1);
+            } else {
+                return zero;
+            }
+        }
+    }
+
+    @Override
+    public void restartAwait() {
+        // only restart if timeout is in use
+        if (timeoutEnabled.get()) {
+            LOG.trace("Restarting await with timeout");
+            restarting.set(true);
+            latch.countDown();
+        }
     }
 }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
index bf904f9..fcc8f39 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/EventHelper.java
@@ -462,6 +462,45 @@ public final class EventHelper {
         return answer;
     }
 
+    public static boolean notifyRouteReloaded(CamelContext context, Route route) {
+        ManagementStrategy management = context.getManagementStrategy();
+        if (management == null) {
+            return false;
+        }
+
+        EventFactory factory = management.getEventFactory();
+        if (factory == null) {
+            return false;
+        }
+
+        List<EventNotifier> notifiers = management.getStartedEventNotifiers();
+        if (notifiers == null || notifiers.isEmpty()) {
+            return false;
+        }
+
+        boolean answer = false;
+        CamelEvent event = null;
+        for (EventNotifier notifier : notifiers) {
+            if (notifier.isDisabled()) {
+                continue;
+            }
+            if (notifier.isIgnoreRouteEvents()) {
+                continue;
+            }
+
+            if (event == null) {
+                // only create event once
+                event = factory.createRouteReloaded(route);
+                if (event == null) {
+                    // factory could not create event so exit
+                    return false;
+                }
+            }
+            answer |= doNotifyEvent(notifier, event);
+        }
+        return answer;
+    }
+
     public static boolean notifyExchangeCreated(CamelContext context, Exchange exchange) {
         ManagementStrategy management = context.getManagementStrategy();
         if (management == null) {
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
index ddcc77d..7007540 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/RouteWatcherReloadStrategy.java
@@ -127,6 +127,11 @@ public class RouteWatcherReloadStrategy extends FileWatcherResourceReloadStrateg
                     if (!ids.isEmpty()) {
                         LOG.info("Reloaded routes: {}", String.join(", ", ids));
                     }
+                    // fire events for routes reloaded
+                    for (String id : ids) {
+                        Route route = getCamelContext().getRoute(id);
+                        EventHelper.notifyRouteReloaded(getCamelContext(), route);
+                    }
 
                     if (!removeAllRoutes) {
                         // if not all previous routes are removed then to have safe route reloading
diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
index b44e8ab..cc637bc 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_14.adoc
@@ -10,3 +10,7 @@ from both 3.0 to 3.1 and 3.1 to 3.2.
 
 Added method `updateRoutesToCamelContext` to `org.apache.camel.RoutesBuilder` interface.
 
+=== camel-jbang
+
+The option `debug-level` has been renamed to `logging-level` because the option is for configuring the logging level.
+
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java
index 6aeed9b..528e05e 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/Run.java
@@ -46,8 +46,8 @@ class Run implements Callable<Integer> {
     private boolean helpRequested;
     //CHECKSTYLE:ON
 
-    @Option(names = { "--debug-level" }, defaultValue = "info", description = "Default debug level")
-    private String debugLevel;
+    @Option(names = { "--logging-level" }, defaultValue = "info", description = "Logging level")
+    private String loggingLevel;
 
     @Option(names = { "--stop" }, description = "Stop all running instances of Camel JBang")
     private boolean stopRequested;
@@ -55,6 +55,13 @@ class Run implements Callable<Integer> {
     @Option(names = { "--max-messages" }, defaultValue = "0", description = "Max number of messages to process before stopping")
     private int maxMessages;
 
+    @Option(names = { "--max-seconds" }, defaultValue = "0", description = "Max seconds to run before stopping")
+    private int maxSeconds;
+
+    @Option(names = { "--max-idle-seconds" }, defaultValue = "0",
+            description = "For how long time in seconds Camel can be idle before stopping")
+    private int maxIdleSeconds;
+
     @Option(names = { "--reload" }, description = "Enables live reload when source file is changed (saved)")
     private boolean reload;
 
@@ -96,11 +103,17 @@ class Run implements Callable<Integer> {
         if (maxMessages > 0) {
             main.addInitialProperty("camel.main.durationMaxMessages", String.valueOf(maxMessages));
         }
+        if (maxSeconds > 0) {
+            main.addInitialProperty("camel.main.durationMaxSeconds", String.valueOf(maxSeconds));
+        }
+        if (maxIdleSeconds > 0) {
+            main.addInitialProperty("camel.main.durationMaxIdleSeconds", String.valueOf(maxIdleSeconds));
+        }
 
-        RuntimeUtil.configureLog(debugLevel);
+        RuntimeUtil.configureLog(loggingLevel);
 
         // shutdown quickly
-        main.addInitialProperty("camel.main.shutdown-timeout", "5");
+        main.addInitialProperty("camel.main.shutdownTimeout", "5");
         // turn off lightweight if we have routes reload enabled
         main.addInitialProperty("camel.main.routesReloadEnabled", reload ? "true" : "false");
 
diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java
index ae4f4a3..bd9eeb4 100644
--- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java
+++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/common/RuntimeUtil.java
@@ -30,10 +30,10 @@ public final class RuntimeUtil {
     private RuntimeUtil() {
     }
 
-    public static void configureLog(String debugLevel) {
-        debugLevel = debugLevel.toLowerCase();
+    public static void configureLog(String level) {
+        level = level.toLowerCase();
 
-        switch (debugLevel) {
+        switch (level) {
             case "trace":
                 Configurator.setRootLevel(Level.TRACE);
                 break;
@@ -54,7 +54,7 @@ public final class RuntimeUtil {
                 break;
             default: {
                 Configurator.setRootLevel(Level.INFO);
-                LoggerFactory.getLogger(RuntimeUtil.class).warn("Invalid debug level: {}", debugLevel);
+                LoggerFactory.getLogger(RuntimeUtil.class).warn("Invalid logging level: {}", level);
             }
         }
     }