You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/18 18:36:15 UTC

[camel] branch main updated: CAMEL-17810: add initial support for the resume API with the master component

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new fab7ae7  CAMEL-17810: add initial support for the resume API with the master component
fab7ae7 is described below

commit fab7ae7c86ae514da4bd213c80296c9e63c4f6b6
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Mar 18 15:18:14 2022 +0100

    CAMEL-17810: add initial support for the resume API with the master component
---
 .../camel/component/master/MasterConsumer.java       | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index 8489ebe..a79ce85 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -21,6 +21,8 @@ import java.util.Optional;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
+import org.apache.camel.ResumeAware;
+import org.apache.camel.ResumeStrategy;
 import org.apache.camel.StartupListener;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.api.management.ManagedAttribute;
@@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * A consumer which is only really active when the {@link CamelClusterView} has the leadership.
  */
 @ManagedResource(description = "Managed Master Consumer")
-public class MasterConsumer extends DefaultConsumer {
+public class MasterConsumer extends DefaultConsumer implements ResumeAware {
     private static final transient Logger LOG = LoggerFactory.getLogger(MasterConsumer.class);
 
     private final CamelClusterService clusterService;
@@ -48,6 +50,7 @@ public class MasterConsumer extends DefaultConsumer {
     private final CamelClusterEventListener.Leadership leadershipListener;
     private volatile Consumer delegatedConsumer;
     private volatile CamelClusterView view;
+    private ResumeStrategy resumeStrategy;
 
     public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, CamelClusterService clusterService) {
         super(masterEndpoint, processor);
@@ -60,6 +63,16 @@ public class MasterConsumer extends DefaultConsumer {
     }
 
     @Override
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    @Override
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
+    @Override
     protected void doStart() throws Exception {
         super.doStart();
 
@@ -125,6 +138,11 @@ public class MasterConsumer extends DefaultConsumer {
             getEndpoint().getCamelContext().addStartupListener((StartupListener) delegatedConsumer);
         }
 
+        if (delegatedConsumer instanceof ResumeAware) {
+            LOG.info("Setting up the resume strategy for the delegated consumer");
+            ((ResumeAware) delegatedConsumer).setResumeStrategy(resumeStrategy);
+        }
+
         ServiceHelper.startService(delegatedEndpoint, delegatedConsumer);
 
         LOG.info("Leadership taken. Consumer started: {}", delegatedEndpoint);