You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/10 07:56:06 UTC

[camel] branch main updated: CAMEL-18678: ensure strategies can access the Camel context

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new e3d6f82b2d7 CAMEL-18678: ensure strategies can access the Camel context
e3d6f82b2d7 is described below

commit e3d6f82b2d7d13ad1e853759036485e4d5db7abc
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 9 16:43:31 2022 +0100

    CAMEL-18678: ensure strategies can access the Camel context
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 40 +++++++++++++++++-----
 .../component/wal/WriteAheadResumeStrategy.java    | 23 +++++++++++--
 .../org/apache/camel/reifier/ResumableReifier.java |  5 +++
 3 files changed, 57 insertions(+), 11 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 32ee0a4af97..660b7f278d8 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -26,10 +26,11 @@ import java.util.Collections;
 import java.util.Objects;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Cacheable;
 import org.apache.camel.resume.Deserializable;
@@ -59,7 +60,7 @@ import org.slf4j.LoggerFactory;
  * integrations.
  */
 @JdkService("kafka-resume-strategy")
-public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
+public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, CamelContextAware {
     private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
     private Consumer<byte[], byte[]> consumer;
@@ -69,12 +70,13 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     private boolean subscribed;
     private ResumeAdapter adapter;
     private KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
-    private final ExecutorService executorService;
+    private ExecutorService executorService;
     private final ReentrantLock writeLock = new ReentrantLock();
     private CountDownLatch initLatch;
+    private CamelContext camelContext;
 
     public SingleNodeKafkaResumeStrategy() {
-        executorService = Executors.newSingleThreadExecutor();
+
     }
 
     /**
@@ -84,7 +86,6 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
-        executorService = Executors.newSingleThreadExecutor();
     }
 
     /**
@@ -174,6 +175,11 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         }
 
         initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
+        if (executorService == null) {
+            executorService
+                    = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "SingleNodeKafkaResumeStrategy");
+        }
+
         executorService.submit(() -> refresh(initLatch));
     }
 
@@ -397,11 +403,17 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         try {
             LOG.info("Closing the Kafka consumer");
             consumer.wakeup();
-            executorService.shutdown();
 
-            if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
-                LOG.warn("Kafka consumer did not shutdown within 2 seconds");
-                executorService.shutdownNow();
+            if (executorService != null) {
+                executorService.shutdown();
+
+                if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
+                    LOG.warn("Kafka consumer did not shutdown within 2 seconds");
+                    executorService.shutdownNow();
+                }
+            } else {
+                // This may happen if the start up has failed in some other part
+                LOG.trace("There's no executor service to shutdown");
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -447,4 +459,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
         return resumeStrategyConfiguration;
     }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
 }
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
index 2bbf92ca3a1..a34e670424c 100644
--- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
@@ -20,7 +20,10 @@ package org.apache.camel.component.wal;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Offset;
@@ -45,7 +48,7 @@ import org.slf4j.LoggerFactory;
  * recovered
  */
 @JdkService("write-ahead-resume-strategy")
-public class WriteAheadResumeStrategy implements ResumeStrategy {
+public class WriteAheadResumeStrategy implements ResumeStrategy, CamelContextAware {
 
     /**
      * An update callback that works for this strategy as well as for the delegate resume strategy that is wrapped in
@@ -72,6 +75,7 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
     private LogWriter logWriter;
     private ResumeStrategy resumeStrategy;
     private WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration;
+    private CamelContext camelContext;
 
     /**
      * Creates a new write-ahead resume strategy
@@ -305,7 +309,12 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
             this.logFile = resumeStrategyConfiguration.getLogFile();
             this.resumeStrategy = resumeStrategyConfiguration.getDelegateResumeStrategy();
 
-            DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(resumeStrategyConfiguration.getSupervisorInterval());
+            final ScheduledExecutorService executorService = camelContext.getExecutorServiceManager()
+                    .newScheduledThreadPool(this, "SingleNodeKafkaResumeStrategy", 1);
+
+            DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(
+                    resumeStrategyConfiguration.getSupervisorInterval(),
+                    executorService);
             logWriter = new LogWriter(logFile, flushPolicy);
         } catch (Exception e) {
             throw new RuntimeCamelException(e);
@@ -334,4 +343,14 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
     public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
         return resumeStrategyConfiguration;
     }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
 }
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index 2db23aeee80..9c6a05c909f 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -18,6 +18,7 @@ package org.apache.camel.reifier;
 
 import java.util.Optional;
 
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Processor;
@@ -44,6 +45,10 @@ public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
         ResumeStrategy resumeStrategy = resolveResumeStrategy();
         ObjectHelper.notNull(resumeStrategy, ResumeStrategy.DEFAULT_NAME, definition);
 
+        if (resumeStrategy instanceof CamelContextAware) {
+            ((CamelContextAware) resumeStrategy).setCamelContext(camelContext);
+        }
+
         route.setResumeStrategy(resumeStrategy);
         LoggingLevel loggingLevel = resolveLoggingLevel();