You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/18 18:36:15 UTC
[camel] branch main updated: CAMEL-17810: add initial support for the resume API with the master component
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new fab7ae7 CAMEL-17810: add initial support for the resume API with the master component
fab7ae7 is described below
commit fab7ae7c86ae514da4bd213c80296c9e63c4f6b6
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Mar 18 15:18:14 2022 +0100
CAMEL-17810: add initial support for the resume API with the master component
---
.../camel/component/master/MasterConsumer.java | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
index 8489ebe..a79ce85 100644
--- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
+++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java
@@ -21,6 +21,8 @@ import java.util.Optional;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
+import org.apache.camel.ResumeAware;
+import org.apache.camel.ResumeStrategy;
import org.apache.camel.StartupListener;
import org.apache.camel.SuspendableService;
import org.apache.camel.api.management.ManagedAttribute;
@@ -38,7 +40,7 @@ import org.slf4j.LoggerFactory;
* A consumer which is only really active when the {@link CamelClusterView} has the leadership.
*/
@ManagedResource(description = "Managed Master Consumer")
-public class MasterConsumer extends DefaultConsumer {
+public class MasterConsumer extends DefaultConsumer implements ResumeAware {
private static final transient Logger LOG = LoggerFactory.getLogger(MasterConsumer.class);
private final CamelClusterService clusterService;
@@ -48,6 +50,7 @@ public class MasterConsumer extends DefaultConsumer {
private final CamelClusterEventListener.Leadership leadershipListener;
private volatile Consumer delegatedConsumer;
private volatile CamelClusterView view;
+ private ResumeStrategy resumeStrategy;
public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, CamelClusterService clusterService) {
super(masterEndpoint, processor);
@@ -60,6 +63,16 @@ public class MasterConsumer extends DefaultConsumer {
}
@Override
+ public ResumeStrategy getResumeStrategy() {
+ return resumeStrategy;
+ }
+
+ @Override
+ public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+ this.resumeStrategy = resumeStrategy;
+ }
+
+ @Override
protected void doStart() throws Exception {
super.doStart();
@@ -125,6 +138,11 @@ public class MasterConsumer extends DefaultConsumer {
getEndpoint().getCamelContext().addStartupListener((StartupListener) delegatedConsumer);
}
+ if (delegatedConsumer instanceof ResumeAware) {
+ LOG.info("Setting up the resume strategy for the delegated consumer");
+ ((ResumeAware) delegatedConsumer).setResumeStrategy(resumeStrategy);
+ }
+
ServiceHelper.startService(delegatedEndpoint, delegatedConsumer);
LOG.info("Leadership taken. Consumer started: {}", delegatedEndpoint);