You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/12/07 10:22:16 UTC

(camel) branch ready updated: CAMEL-20189: camel-core: Add better API on scheduled poll consumer to force mark the consumer ready.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch ready
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/ready by this push:
     new e2422f43a88 CAMEL-20189: camel-core: Add better API on scheduled poll consumer to force mark the consumer ready.
e2422f43a88 is described below

commit e2422f43a880ea3b14fa20020a966ea281c93d8a
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Dec 7 11:22:00 2023 +0100

    CAMEL-20189: camel-core: Add better API on scheduled poll consumer to force mark the consumer ready.
---
 .../mbean/ManagedSchedulePollConsumerMBean.java    |  3 ++
 .../mbean/ManagedScheduledPollConsumer.java        |  5 ++++
 .../camel/support/ScheduledPollConsumer.java       | 32 +++++++++++++++-------
 .../support/ScheduledPollConsumerHealthCheck.java  |  6 ++--
 4 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
index 77aa84b89f7..71ee4463672 100644
--- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
+++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedSchedulePollConsumerMBean.java
@@ -93,6 +93,9 @@ public interface ManagedSchedulePollConsumerMBean extends ManagedConsumerMBean {
     @ManagedAttribute(description = "Whether a first pool attempt has been done (also if the consumer has been restarted)")
     boolean isFirstPollDone();
 
+    @ManagedAttribute(description = "Whether the consumer is ready to handle incoming traffic (used for readiness health-check)")
+    boolean isConsumerReady();
+
     @ManagedAttribute(description = "Total number of polls run")
     long getCounter();
 
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
index 7c39603de50..ed0a1b5267e 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedScheduledPollConsumer.java
@@ -158,6 +158,11 @@ public class ManagedScheduledPollConsumer extends ManagedConsumer implements Man
         return getConsumer().isFirstPollDone();
     }
 
+    @Override
+    public boolean isConsumerReady() {
+        return getConsumer().isConsumerReady();
+    }
+
     @Override
     public long getCounter() {
         return getConsumer().getCounter();
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
index 960680b6a0f..7081fa6c7f5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java
@@ -80,6 +80,7 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
     private volatile Map<String, Object> lastErrorDetails;
     private final AtomicLong counter = new AtomicLong();
     private volatile boolean firstPollDone;
+    private volatile boolean forceReady;
 
     public ScheduledPollConsumer(Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -481,28 +482,37 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
 
     /**
      * Whether a first pool attempt has been done (also if the consumer has been restarted).
-     *
-     * The health-check is using this information to know when the consumer is ready for readiness checks.
-     *
-     * @see #forceFirstPollDone()
      */
     public boolean isFirstPollDone() {
         return firstPollDone;
     }
 
+    /**
+     * Whether the consumer is ready and has established connection to its target system, or first poll has been
+     * completed successfully.
+     *
+     * The health-check is using this information to know when the consumer is ready for readiness checks.
+     */
+    public boolean isConsumerReady() {
+        // we regard the consumer as ready if it was explicit forced to be ready (component specific)
+        // or that it has completed its first poll without an exception was thrown
+        // during connecting to target system and accepting data
+        return forceReady || firstPollDone;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
 
     /**
-     * Forces the consumer to be marked as ready. This can be used by components that
-     * need to mark this sooner than usual (default marked as ready after first poll is done).
-     * This allows health-checks to be ready before an entire poll is completed.
+     * Forces the consumer to be marked as ready. This can be used by components that need to mark this sooner than
+     * usual (default marked as ready after first poll is done). This allows health-checks to be ready before an entire
+     * poll is completed.
      *
-     * This is for example needed by the FTP component as polling a large file can take long time,
-     * causing a health-check to not be ready within reasonable time.
+     * This is for example needed by the FTP component as polling a large file can take long time, causing a
+     * health-check to not be ready within reasonable time.
      */
     protected void forceConsumerAsReady() {
-        firstPollDone = true;
+        forceReady = true;
     }
 
     /**
@@ -688,7 +698,9 @@ public abstract class ScheduledPollConsumer extends DefaultConsumer
         errorCounter = 0;
         successCounter = 0;
         counter.set(0);
+        // clear ready state
         firstPollDone = false;
+        forceReady = false;
 
         super.doStop();
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
index 69563f260bc..6565156bad4 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumerHealthCheck.java
@@ -83,13 +83,13 @@ public class ScheduledPollConsumerHealthCheck implements HealthCheck {
         }
 
         long ec = consumer.getErrorCounter();
-        boolean first = consumer.isFirstPollDone();
+        boolean ready = consumer.isConsumerReady();
         Throwable cause = consumer.getLastError();
 
         boolean healthy = ec == 0;
         boolean readiness = kind.equals(Kind.READINESS);
-        if (readiness && !first) {
-            // special for readiness check before first poll is done
+        if (readiness && !ready) {
+            // special for readiness check before first poll is done or not yet ready
             // if initial state is UP or UNKNOWN then return that
             // otherwise we are DOWN
             boolean down = builder.state().equals(State.DOWN);