You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/26 10:03:31 UTC

[camel] 02/03: CAMEL-18649: prevent checking the cache unless fully initialized

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 398afcf15968a10c8224d7ae233cf631445e1fa2
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 16:40:59 2022 +0200

    CAMEL-18649: prevent checking the cache unless fully initialized
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 24 ++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 6518311f76e..a8e56dcf71a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -67,7 +67,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
     private final ExecutorService executorService;
-    private final ReentrantLock lock = new ReentrantLock();
+    private final ReentrantLock writeLock = new ReentrantLock();
+    private final CountDownLatch initLatch;
 
     /**
      * Builds an instance of this class
@@ -77,6 +78,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
         executorService = Executors.newSingleThreadExecutor();
+
+        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
     }
 
     /**
@@ -88,6 +91,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
                                          ExecutorService executorService) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
         this.executorService = executorService;
+
+        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
     }
 
     /**
@@ -147,10 +152,10 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         ByteBuffer valueBuffer = offsetKey.serialize();
 
         try {
-            lock.lock();
+            writeLock.lock();
             produce(keyBuffer.array(), valueBuffer.array(), updateCallBack);
         } finally {
-            lock.unlock();
+            writeLock.unlock();
         }
 
         doAdd(offsetKey, offset);
@@ -165,12 +170,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
-        CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
-        executorService.submit(() -> refresh(latch));
+        executorService.submit(() -> refresh(initLatch));
+    }
 
+    private void waitForInitialization() {
         try {
             LOG.trace("Waiting for kafka resume strategy async initialization");
-            if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+            if (!initLatch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(),
+                    TimeUnit.MILLISECONDS)) {
                 LOG.debug("The initialization timed out");
             }
             LOG.trace("Kafka resume strategy initialization complete");
@@ -335,6 +342,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
 
     @Override
     public ResumeAdapter getAdapter() {
+        waitForInitialization();
         return adapter;
     }
 
@@ -367,7 +375,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     public void stop() {
         try {
             LOG.trace("Trying to obtain a lock for closing the producer");
-            if (!lock.tryLock(1, TimeUnit.SECONDS)) {
+            if (!writeLock.tryLock(1, TimeUnit.SECONDS)) {
                 LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ...");
             }
 
@@ -376,7 +384,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         } finally {
-            lock.unlock();
+            writeLock.unlock();
         }
 
         try {