You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/26 10:03:31 UTC
[camel] 02/03: CAMEL-18649: prevent checking the cache unless fully initialized
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 398afcf15968a10c8224d7ae233cf631445e1fa2
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 16:40:59 2022 +0200
CAMEL-18649: prevent checking the cache unless fully initialized
---
.../kafka/SingleNodeKafkaResumeStrategy.java | 24 ++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 6518311f76e..a8e56dcf71a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -67,7 +67,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
private ResumeAdapter adapter;
private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
private final ExecutorService executorService;
- private final ReentrantLock lock = new ReentrantLock();
+ private final ReentrantLock writeLock = new ReentrantLock();
+ private final CountDownLatch initLatch;
/**
* Builds an instance of this class
@@ -77,6 +78,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
executorService = Executors.newSingleThreadExecutor();
+
+ initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
}
/**
@@ -88,6 +91,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
ExecutorService executorService) {
this.resumeStrategyConfiguration = resumeStrategyConfiguration;
this.executorService = executorService;
+
+ initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
}
/**
@@ -147,10 +152,10 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
ByteBuffer valueBuffer = offsetKey.serialize();
try {
- lock.lock();
+ writeLock.lock();
produce(keyBuffer.array(), valueBuffer.array(), updateCallBack);
} finally {
- lock.unlock();
+ writeLock.unlock();
}
doAdd(offsetKey, offset);
@@ -165,12 +170,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
}
- CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
- executorService.submit(() -> refresh(latch));
+ executorService.submit(() -> refresh(initLatch));
+ }
+ private void waitForInitialization() {
try {
LOG.trace("Waiting for kafka resume strategy async initialization");
- if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+ if (!initLatch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(),
+ TimeUnit.MILLISECONDS)) {
LOG.debug("The initialization timed out");
}
LOG.trace("Kafka resume strategy initialization complete");
@@ -335,6 +342,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
@Override
public ResumeAdapter getAdapter() {
+ waitForInitialization();
return adapter;
}
@@ -367,7 +375,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
public void stop() {
try {
LOG.trace("Trying to obtain a lock for closing the producer");
- if (!lock.tryLock(1, TimeUnit.SECONDS)) {
+ if (!writeLock.tryLock(1, TimeUnit.SECONDS)) {
LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ...");
}
@@ -376,7 +384,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
- lock.unlock();
+ writeLock.unlock();
}
try {