You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 16:26:39 UTC

[camel] 01/06: CAMEL-18356: consolidated the Kafka resume strategies

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9e832b0428cf7f0101e9be2cb6b9c19f4958b5fd
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Aug 10 16:50:25 2022 +0200

    CAMEL-18356: consolidated the Kafka resume strategies
---
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 64 +---------------------
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 60 ++++++++++++++++----
 2 files changed, 50 insertions(+), 74 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 4cfe256fb3d..82580200f03 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory;
  *
  * @param <K> the type of key
  */
+@Deprecated
 public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNodeKafkaResumeStrategy<K> {
     private static final Logger LOG = LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
-    private final ExecutorService executorService;
 
     /**
      * Create a new instance of this class
@@ -64,68 +64,8 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
 
     public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
                                         ExecutorService executorService) {
-        super(resumeStrategyConfiguration);
-
-        this.executorService = executorService;
-    }
-
-    protected void poll() {
-        poll(getConsumer());
-    }
-
-    protected void poll(Consumer<byte[], byte[]> consumer) {
-        Deserializable deserializable = (Deserializable) getAdapter();
-
-        ConsumerRecords<byte[], byte[]> records;
-        do {
-            records = consume(10, consumer);
-
-            if (records.isEmpty()) {
-                break;
-            }
-
-            for (ConsumerRecord<byte[], byte[]> record : records) {
-                byte[] value = record.value();
-
-                LOG.trace("Read from Kafka: {}", value);
-
-                deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()));
-            }
-        } while (true);
+        super(resumeStrategyConfiguration, executorService);
     }
 
-    @Override
-    public void loadCache() throws Exception {
-        super.loadCache();
 
-        executorService.submit(() -> refresh());
-    }
-
-    /**
-     * Launch a thread to refresh the offsets periodically
-     */
-    private void refresh() {
-        LOG.trace("Creating a offset cache refresher");
-        try {
-            Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
-            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
-
-            try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
-                consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
-
-                poll(consumer);
-            }
-        } catch (Exception e) {
-            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public void stop() {
-        try {
-            executorService.shutdown();
-        } finally {
-            super.stop();
-        }
-    }
 }
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 870185127de..db87f65074a 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,9 +23,13 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.Properties;
 import java.util.Queue;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.resume.Cacheable;
@@ -37,6 +41,7 @@ import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.util.IOHelper;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -50,9 +55,7 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A resume strategy that publishes offsets to a Kafka topic. This resume strategy is suitable for single node
- * integrations. For multi-node integrations (i.e: using clusters with the master component check
- * {@link MultiNodeKafkaResumeStrategy}.
- *
+ * integrations.
  */
 public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> {
     private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
@@ -66,15 +69,29 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     private boolean subscribed;
     private ResumeAdapter adapter;
     private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
+    private final ExecutorService executorService;
 
     /**
      * Builds an instance of this class
-     * 
+     *
      * @param resumeStrategyConfiguration the configuration to use for this strategy instance
      *
      */
     public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) {
         this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+        executorService = Executors.newSingleThreadExecutor();
+    }
+
+    /**
+     * Builds an instance of this class
+     *
+     * @param resumeStrategyConfiguration the configuration to use for this strategy instance
+     *
+     */
+    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration,
+                                         ExecutorService executorService) {
+        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
+        this.executorService = executorService;
     }
 
     /**
@@ -125,7 +142,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Loads the existing data into the cache
-     * 
+     *
      * @throws Exception
      */
     public void loadCache() throws Exception {
@@ -138,9 +155,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         if (!(adapter instanceof Deserializable)) {
             throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable");
         }
-        poll();
 
-        unsubscribe();
+        executorService.submit(() -> refresh());
     }
 
     protected void poll() {
@@ -160,7 +176,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
                 LOG.trace("Read from Kafka: {}", value);
 
                 if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), ByteBuffer.wrap(record.value()))) {
-                    break;
+                    LOG.warn("Deserializar indicates that this is the last record to deserialize");
                 }
             }
         } while (true);
@@ -168,7 +184,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Subscribe to the topic if not subscribed yet
-     * 
+     *
      * @param topic the topic to consume the messages from
      */
     protected void checkAndSubscribe(String topic) {
@@ -181,7 +197,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Subscribe to the topic if not subscribed yet
-     * 
+     *
      * @param topic     the topic to consume the messages from
      * @param remaining the number of messages to rewind from the last offset position (used to fill the cache)
      */
@@ -196,7 +212,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
     /**
      * Creates a new consumer rebalance listener. This can be useful for setting the exact Kafka offset when necessary
      * to read a limited amount of messages or customize the resume strategy behavior when a rebalance occurs.
-     * 
+     *
      * @param  remaining the number of remaining messages on the topic to try to collect
      * @return
      */
@@ -252,7 +268,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
     /**
      * Consumes message from the topic previously setup
-     * 
+     *
      * @param  retries how many times to retry consuming data from the topic
      * @return         An instance of the consumer records
      */
@@ -349,6 +365,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
 
         LOG.info("Closing the Kafka consumer");
         IOHelper.close(producer, "Kafka consumer", LOG);
+
+        executorService.shutdown();
     }
 
     @Override
@@ -388,4 +406,22 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka
         return resumeStrategyConfiguration;
     }
 
+    /**
+     * Launch a thread to refresh the offsets periodically
+     */
+    private void refresh() {
+        LOG.trace("Creating a offset cache refresher");
+        try {
+            Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone();
+            prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+
+            try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) {
+                consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
+
+                poll();
+            }
+        } catch (Exception e) {
+            LOG.error("Error while refreshing the local cache: {}", e.getMessage(), e);
+        }
+    }
 }