You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/10/26 10:03:29 UTC

[camel] branch main updated (e9ef9684f17 -> 756b3c08063)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from e9ef9684f17 (chores) camel-aws2-secrets-manager: disable flaky test on Github
     new b7d9797d62f CAMEL-18649: avoid multiple recursive checks for resumed files
     new 398afcf1596 CAMEL-18649: prevent checking the cache unless fully initialized
     new 756b3c08063 CAMEL-18148: fixed incorrect cache registration

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/file/FileConsumer.java  | 23 +++----
 .../consumer/DirectoryEntriesResumeAdapter.java    |  6 +-
 .../DefaultDirectoryEntriesResumeAdapter.java      | 46 ++++++-------
 .../file/consumer/adapters/DirectoryEntries.java   | 75 ----------------------
 .../adapters/FileResumeAdapterDelegate.java        | 10 +--
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 26 +++++---
 .../FileConsumerResumeFromOffsetStrategyTest.java  | 14 +---
 .../file/FileConsumerResumeStrategyTest.java       | 20 +++---
 8 files changed, 68 insertions(+), 152 deletions(-)
 delete mode 100644 components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java


[camel] 03/03: CAMEL-18148: fixed incorrect cache registration

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 756b3c08063e083b709b6584c8b2f63805998f67
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 17:04:01 2022 +0200

    CAMEL-18148: fixed incorrect cache registration
---
 .../camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index a8e56dcf71a..605b498cd9a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -149,7 +149,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     @Override
     public void updateLastOffset(OffsetKey<?> offsetKey, Offset<?> offset, UpdateCallBack updateCallBack) throws Exception {
         ByteBuffer keyBuffer = offsetKey.serialize();
-        ByteBuffer valueBuffer = offsetKey.serialize();
+        ByteBuffer valueBuffer = offset.serialize();
 
         try {
             writeLock.lock();


[camel] 01/03: CAMEL-18649: avoid multiple recursive checks for resumed files

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b7d9797d62f0c00b02ed767a67118b4cde7322c4
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 14:42:58 2022 +0200

    CAMEL-18649: avoid multiple recursive checks for resumed files
---
 .../apache/camel/component/file/FileConsumer.java  | 23 +++----
 .../consumer/DirectoryEntriesResumeAdapter.java    |  6 +-
 .../DefaultDirectoryEntriesResumeAdapter.java      | 46 ++++++-------
 .../file/consumer/adapters/DirectoryEntries.java   | 75 ----------------------
 .../adapters/FileResumeAdapterDelegate.java        | 10 +--
 .../FileConsumerResumeFromOffsetStrategyTest.java  | 14 +---
 .../file/FileConsumerResumeStrategyTest.java       | 20 +++---
 7 files changed, 51 insertions(+), 143 deletions(-)

diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 6aeec6a7323..0fff815b525 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -33,7 +33,6 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -107,10 +106,20 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
 
             if (resumeStrategy != null) {
                 ResumeAdapter adapter = resumeStrategy.getAdapter();
+                LOG.trace("Checking the resume adapter: {}", adapter);
                 if (adapter instanceof FileOffsetResumeAdapter) {
+                    LOG.trace("The resume adapter is for offsets: {}", adapter);
                     ((FileOffsetResumeAdapter) adapter).setResumePayload(gf);
                     adapter.resume();
                 }
+
+                if (adapter instanceof DirectoryEntriesResumeAdapter) {
+                    LOG.trace("Running the resume process for file {}", file);
+                    if (((DirectoryEntriesResumeAdapter) adapter).resume(file)) {
+                        LOG.trace("Skipping file {} because it has been marked previously consumed", file);
+                        continue;
+                    }
+                }
             }
 
             if (file.isDirectory()) {
@@ -177,18 +186,6 @@ public class FileConsumer extends GenericFileConsumer<File> implements ResumeAwa
             }
         }
 
-        if (resumeStrategy != null) {
-            ResumeAdapter adapter = resumeStrategy.getAdapter();
-            if (adapter instanceof DirectoryEntriesResumeAdapter) {
-                DirectoryEntries resumeSet = new DirectoryEntries(directory, dirFiles);
-
-                ((DirectoryEntriesResumeAdapter) adapter).setResumePayload(resumeSet);
-                adapter.resume();
-
-                return resumeSet.resumed();
-            }
-        }
-
         return dirFiles;
     }
 
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
index 9d1de1297dd..e7842d33cb2 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
@@ -17,10 +17,8 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
+import java.io.File;
 
 public interface DirectoryEntriesResumeAdapter {
-    default void setResumePayload(DirectoryEntries fileSet) {
-
-    }
+    boolean resume(File file);
 }
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
index 09e33457adb..445913a3bdd 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
@@ -21,29 +21,16 @@ import java.io.File;
 
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An implementation of the {@link FileResumeAdapter} that can be used for resume operations for the file component.
  * This one can be used to manage the resume operations for files within a directory.
  */
 class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter implements DirectoryEntriesResumeAdapter {
-    private DirectoryEntries fileSet;
-
-    private boolean notProcessed(File directory, File file) {
-        FileSet cached = cache.get(directory, FileSet.class);
-        if (cached == null) {
-            return true;
-        }
-
-        return !cached.contains(file);
-    }
-
-    @Override
-    public void setResumePayload(DirectoryEntries fileSet) {
-        assert fileSet != null;
-
-        this.fileSet = fileSet;
-    }
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultDirectoryEntriesResumeAdapter.class);
 
     protected boolean add(Object key, Object offset) {
         if (offset instanceof File) {
@@ -58,16 +45,31 @@ class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter imp
         return true;
     }
 
-    private void resumeDirectoryEntries() {
-        DirectoryEntries.doResume(fileSet, f -> notProcessed(fileSet.getDirectory(), f));
-    }
-
     @Override
     public void resume() {
-        resumeDirectoryEntries();
+        // NO-OP
+    }
+
+    private boolean processed(ResumeCache<File> cache, File directory, File file) {
+        LOG.trace("Checking if file {} with key {} is cached: {}, {}", file, directory);
+        FileSet cached = cache.get(directory, FileSet.class);
+        if (cached == null) {
+            LOG.trace("FileSet is not cached, therefore has not been processed yet");
+            return false;
+        }
+
+        final boolean isCached = cached.contains(file);
+        LOG.trace("FileSet is cached. Checking if it contains {}: {}", file, isCached);
+
+        return isCached;
+    }
+
+    public boolean resume(File file) {
+        return processed(cache, file.getParentFile(), file);
     }
 
     public void deserializeFileEntry(File keyObj, File valueObj) {
+        LOG.trace("Deserializing file key {} with value {}", keyObj, valueObj);
         FileSet fileSet = (FileSet) cache.computeIfAbsent(keyObj, obj -> new FileSet());
 
         fileSet.update(valueObj);
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
deleted file mode 100644
index 5795dc5042e..00000000000
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-import java.util.Objects;
-import java.util.function.Predicate;
-
-import org.apache.camel.support.resume.Resumables;
-
-/**
- * This contains the input/output file set for resume operations.
- */
-public final class DirectoryEntries {
-    private final File directory;
-    private final File[] inputFiles;
-    private File[] outputFiles;
-
-    public DirectoryEntries(File directory, File[] inputFiles) {
-        this.directory = directory;
-        this.inputFiles = Objects.requireNonNull(inputFiles,
-                "A list of input files must be provided for the resume info");
-    }
-
-    /**
-     * Gets the files that should be resumed
-     *
-     * @return an array with the files that should be resumed
-     */
-    public File[] resumed() {
-        return outputFiles;
-    }
-
-    /**
-     * Whether there are resumable files to process
-     *
-     * @return true if there are resumable files or false otherwise
-     */
-    public boolean hasResumables() {
-        if (outputFiles != inputFiles) {
-            return true;
-        }
-
-        return false;
-    }
-
-    public File getDirectory() {
-        return directory;
-    }
-
-    public void setOutputFiles(File[] resumed) {
-        this.outputFiles = resumed;
-    }
-
-    public static void doResume(DirectoryEntries directoryEntries, Predicate<File> resumableCheck) {
-        File[] processed = Resumables.resumeEach(directoryEntries.inputFiles,
-                resumableCheck);
-        directoryEntries.setOutputFiles(processed);
-    }
-}
diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
index 79def33b4e3..78bf97643a4 100644
--- a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
+++ b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -45,11 +45,6 @@ public class FileResumeAdapterDelegate
         fileOffsetResumeAdapter.setResumePayload(genericFile);
     }
 
-    @Override
-    public void setResumePayload(DirectoryEntries fileSet) {
-        directoryEntriesResumeAdapter.setResumePayload(fileSet);
-    }
-
     @Override
     public boolean add(OffsetKey<?> key, Offset<?> offset) {
         Object offsetObj = offset.getValue();
@@ -93,4 +88,9 @@ public class FileResumeAdapterDelegate
         fileOffsetResumeAdapter.resume();
         directoryEntriesResumeAdapter.resume();
     }
+
+    @Override
+    public boolean resume(File file) {
+        return directoryEntriesResumeAdapter.resume(file);
+    }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index b29d99d9f51..beca7934914 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -29,7 +29,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
@@ -44,7 +43,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
 
     private static class TestFileResumeAdapter implements FileResumeAdapter, FileOffsetResumeAdapter {
         private GenericFile<File> resumable;
-        private DirectoryEntries fileSet;
 
         @Override
         public void setResumePayload(GenericFile<File> resumable) {
@@ -61,14 +59,6 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
                 resumable.updateLastOffsetValue(3L);
                 resumable = null;
             }
-
-            if (fileSet != null) {
-                DirectoryEntries.doResume(fileSet, f -> !f.getName().equals("resume-from-offset"));
-                LOG.debug("Fileset: {}", fileSet);
-                LOG.debug("Fileset: {}", fileSet.resumed());
-
-                fileSet = null;
-            }
         }
     }
 
@@ -81,8 +71,8 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport
         }
 
         @Override
-        public void setResumePayload(DirectoryEntries fileSet) {
-            DirectoryEntries.doResume(fileSet, f -> true);
+        public boolean resume(File file) {
+            return false;
         }
 
     }
diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 5241e1571ec..ba8b9446913 100644
--- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -16,25 +16,22 @@
  */
 package org.apache.camel.component.file;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
 import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @DisplayName("Tests whether file consumer works with the resume strategy")
@@ -42,16 +39,17 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
 
     private static class TestFileSetResumeAdapter implements FileResumeAdapter, DirectoryEntriesResumeAdapter {
         private final List<String> processedFiles = Arrays.asList("0.txt", "1.txt", "2.txt");
-        private DirectoryEntries resumeSet;
+        private boolean resumedCalled;
 
         @Override
-        public void setResumePayload(DirectoryEntries resumeSet) {
-            this.resumeSet = Objects.requireNonNull(resumeSet);
+        public void resume() {
+
         }
 
         @Override
-        public void resume() {
-            DirectoryEntries.doResume(resumeSet, f -> !processedFiles.contains(f.getName()));
+        public boolean resume(File file) {
+            resumedCalled = true;
+            return processedFiles.contains(file);
         }
     }
 
@@ -80,9 +78,7 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport {
         // only expect 4 of the 6 sent
         assertMockEndpointsSatisfied();
 
-        assertTrue(adapter.resumeSet.hasResumables(), "The resume set should have resumables in this scenario");
-        assertNotNull(adapter.resumeSet.resumed(), "The list of resumables should not be null");
-        assertEquals(4, adapter.resumeSet.resumed().length, "There should be exactly 4 resumables");
+        assertTrue(adapter.resumedCalled, "The resume set should have resumables in this scenario");
     }
 
     private void setOffset(Exchange exchange) {


[camel] 02/03: CAMEL-18649: prevent checking the cache unless fully initialized

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 398afcf15968a10c8224d7ae233cf631445e1fa2
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 25 16:40:59 2022 +0200

    CAMEL-18649: prevent checking the cache unless fully initialized
---
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 24 ++++++++++++++--------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 6518311f76e..a8e56dcf71a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -67,7 +67,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
     private final ExecutorService executorService;
-    private final ReentrantLock lock = new ReentrantLock();
+    private final ReentrantLock writeLock = new ReentrantLock();
+    private final CountDownLatch initLatch;
 
     /**
      * Builds an instance of this class
@@ -77,6 +78,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
         executorService = Executors.newSingleThreadExecutor();
+
+        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
     }
 
     /**
@@ -88,6 +91,8 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
                                          ExecutorService executorService) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
         this.executorService = executorService;
+
+        initLatch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
     }
 
     /**
@@ -147,10 +152,10 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         ByteBuffer valueBuffer = offsetKey.serialize();
 
         try {
-            lock.lock();
+            writeLock.lock();
             produce(keyBuffer.array(), valueBuffer.array(), updateCallBack);
         } finally {
-            lock.unlock();
+            writeLock.unlock();
         }
 
         doAdd(offsetKey, offset);
@@ -165,12 +170,14 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
 
-        CountDownLatch latch = new CountDownLatch(resumeStrategyConfiguration.getMaxInitializationRetries());
-        executorService.submit(() -> refresh(latch));
+        executorService.submit(() -> refresh(initLatch));
+    }
 
+    private void waitForInitialization() {
         try {
             LOG.trace("Waiting for kafka resume strategy async initialization");
-            if (!latch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(), TimeUnit.MILLISECONDS)) {
+            if (!initLatch.await(resumeStrategyConfiguration.getMaxInitializationDuration().toMillis(),
+                    TimeUnit.MILLISECONDS)) {
                 LOG.debug("The initialization timed out");
             }
             LOG.trace("Kafka resume strategy initialization complete");
@@ -335,6 +342,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
 
     @Override
     public ResumeAdapter getAdapter() {
+        waitForInitialization();
         return adapter;
     }
 
@@ -367,7 +375,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
     public void stop() {
         try {
             LOG.trace("Trying to obtain a lock for closing the producer");
-            if (!lock.tryLock(1, TimeUnit.SECONDS)) {
+            if (!writeLock.tryLock(1, TimeUnit.SECONDS)) {
                 LOG.warn("Failed to obtain a lock for closing the producer. Force closing the producer ...");
             }
 
@@ -376,7 +384,7 @@ public class SingleNodeKafkaResumeStrategy implements KafkaResumeStrategy {
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         } finally {
-            lock.unlock();
+            writeLock.unlock();
         }
 
         try {