You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/03/01 18:24:17 UTC

[camel] 11/16: CAMEL-15562: minor cleanups for the Kafka strategy

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 616b50d4851e712ae563e337e7f12c155697b1c9
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Mon Feb 28 13:51:57 2022 +0100

    CAMEL-15562: minor cleanups for the Kafka strategy
---
 .../resume/kafka/AbstractKafkaResumeStrategy.java  | 37 ++++++++++++++--------
 1 file changed, 24 insertions(+), 13 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
index a779c00..079d936 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -87,7 +87,13 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         init();
     }
 
-    private Properties createProducer(String bootstrapServers) {
+    /**
+     * Creates a basic string-based producer
+     * 
+     * @param  bootstrapServers the Kafka host
+     * @return                  A set of default properties for producing string-based key/pair records from Kafka
+     */
+    public static Properties createProducer(String bootstrapServers) {
         Properties config = new Properties();
 
         config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
@@ -96,15 +102,16 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
         StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 
-        // set up the producer to remove all batching on send, we want all sends
-        // to be fully synchronous
-        config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
-        config.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
-
         return config;
     }
 
-    private Properties createConsumer(String bootstrapServers) {
+    /**
+     * Creates a basic string-based consumer
+     * 
+     * @param  bootstrapServers the Kafka host
+     * @return                  A set of default properties for consuming string-based key/pair records from Kafka
+     */
+    public static Properties createConsumer(String bootstrapServers) {
         Properties config = new Properties();
 
         config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -124,13 +131,12 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
     /**
      * Sends data to a topic
-     *
-     * @param  topic                the topic to send data to
+     * 
      * @param  message              the message to send
      * @throws ExecutionException
      * @throws InterruptedException
      */
-    public void produce(String topic, K key, V message) throws ExecutionException, InterruptedException {
+    public void produce(K key, V message) throws ExecutionException, InterruptedException {
         ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, message);
 
         errorCount = 0;
@@ -151,7 +157,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
         LOG.debug("Updating offset on Kafka with key {} to {}", key, offsetValue);
 
-        produce(topic, key, offsetValue);
+        produce(key, offsetValue);
 
         resumeCache.add(key, offsetValue);
     }
@@ -231,9 +237,14 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
 
     public abstract void subscribe() throws Exception;
 
-    // TODO: bad method
     public void unsubscribe() {
-        consumer.unsubscribe();
+        try {
+            consumer.unsubscribe();
+        } catch (IllegalStateException e) {
+            LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", topic);
+        } catch (Exception e) {
+            LOG.error("Error unsubscribing from the Kafka topic {}: {}", topic, e.getMessage(), e);
+        }
     }
 
     /**