You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/11/10 07:56:06 UTC
[camel] branch main updated: CAMEL-18678: ensure strategies can access the Camel context
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new e3d6f82b2d7 CAMEL-18678: ensure strategies can access the Camel context
e3d6f82b2d7 is described below
commit e3d6f82b2d7d13ad1e853759036485e4d5db7abc
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Nov 9 16:43:31 2022 +0100
CAMEL-18678: ensure strategies can access the Camel context
---
.../kafka/SingleNodeKafkaResumeStrategy.java | 40 +++++++++++++++++-----
.../component/wal/WriteAheadResumeStrategy.java | 23 +++++++++++--
.../org/apache/camel/reifier/ResumableReifier.java | 5 +++
3 files changed, 57 insertions(+), 11 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 32ee0a4af97..660b7f278d8 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -26,10 +26,11 @@ import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.resume.Cacheable;
import org.apache.camel.resume.Deserializable;
@@ -59,7 +60,7 @@ import org.slf4j.LoggerFactory;
* integrations.
*/
@JdkService("kafka-resume-strategy")
-public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
+public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
private Consumer<byte[], byte[]> consumer;
@@ -69,12 +70,13 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
private boolean subscribed;
private ResumeAdapter adapter;
private KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
- private final ExecutorService executorService;
+ private ExecutorService executorService;
private final ReentrantLock writeLock = new ReentrantLock();
private CountDownLatch initLatch;
+ private CamelContext camelContext;
public SingleNodeKafkaResumeStrategy() {
- executorService = Executors.newSingleThreadExecutor();
+
}
/**
@@ -84,7 +86,6 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
*/
public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
- executorService = Executors.newSingleThreadExecutor();
}
/**
@@ -174,6 +175,11 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
}
initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
+ if (executorService == null) {
+ executorService
+ = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "SingleNodeKafkaResumeStrategy");
+ }
+
executorService.submit(() -> refresh(initLatch));
}
@@ -397,11 +403,17 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
try {
LOG.info("Closing the Kafka consumer");
consumer.wakeup();
- executorService.shutdown();
- if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
- LOG.warn("Kafka consumer did not shutdown within 2 seconds");
- executorService.shutdownNow();
+ if (executorService != null) {
+ executorService.shutdown();
+
+ if (!executorService.awaitTermination(2, TimeUnit.SECONDS)) {
+ LOG.warn("Kafka consumer did not shutdown within 2 seconds");
+ executorService.shutdownNow();
+ }
+ } else {
+ // This may happen if the start up has failed in some other part
+ LOG.trace("There's no executor service to shutdown");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -447,4 +459,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
return resumeStrategyConfiguration;
}
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
}
diff --git a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
index 2bbf92ca3a1..a34e670424c 100644
--- a/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
+++ b/components/camel-wal/src/main/java/org/apache/camel/component/wal/WriteAheadResumeStrategy.java
@@ -20,7 +20,10 @@ package org.apache.camel.component.wal;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.ScheduledExecutorService;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.resume.Deserializable;
import org.apache.camel.resume.Offset;
@@ -45,7 +48,7 @@ import org.slf4j.LoggerFactory;
* recovered
*/
@JdkService("write-ahead-resume-strategy")
-public class WriteAheadResumeStrategy implements ResumeStrategy {
+public class WriteAheadResumeStrategy implements ResumeStrategy, CamelContextAware {
/**
* An update callback that works for this strategy as well as for the delegate resume strategy that is wrapped in
@@ -72,6 +75,7 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
private LogWriter logWriter;
private ResumeStrategy resumeStrategy;
private WriteAheadResumeStrategyConfiguration resumeStrategyConfiguration;
+ private CamelContext camelContext;
/**
* Creates a new write-ahead resume strategy
@@ -305,7 +309,12 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
this.logFile = resumeStrategyConfiguration.getLogFile();
this.resumeStrategy = resumeStrategyConfiguration.getDelegateResumeStrategy();
- DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(resumeStrategyConfiguration.getSupervisorInterval());
+ final ScheduledExecutorService executorService = camelContext.getExecutorServiceManager()
+ .newScheduledThreadPool(this, "SingleNodeKafkaResumeStrategy", 1);
+
+ DefaultLogSupervisor flushPolicy = new DefaultLogSupervisor(
+ resumeStrategyConfiguration.getSupervisorInterval(),
+ executorService);
logWriter = new LogWriter(logFile, flushPolicy);
} catch (Exception e) {
throw new RuntimeCamelException(e);
@@ -334,4 +343,14 @@ public class WriteAheadResumeStrategy implements ResumeStrategy {
public ResumeStrategyConfiguration getResumeStrategyConfiguration() {
return resumeStrategyConfiguration;
}
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
}
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
index 2db23aeee80..9c6a05c909f 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -18,6 +18,7 @@ package org.apache.camel.reifier;
import java.util.Optional;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
@@ -44,6 +45,10 @@ public class ResumableReifier extends ProcessorReifier<ResumableDefinition> {
ResumeStrategy resumeStrategy = resolveResumeStrategy();
ObjectHelper.notNull(resumeStrategy, ResumeStrategy.DEFAULT_NAME, definition);
+ if (resumeStrategy instanceof CamelContextAware) {
+ ((CamelContextAware) resumeStrategy).setCamelContext(camelContext);
+ }
+
route.setResumeStrategy(resumeStrategy);
LoggingLevel loggingLevel = resolveLoggingLevel();