You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/01/18 12:55:51 UTC

[camel] branch camel-3.14.x updated (b291714 -> 8a7102e)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from b291714  Fixed typo in example code
     new 49af1a9  CAMEL-17489: camel-kafka - Unsubscribe before closing the consumer
     new a696417  CAMEL-17493: ignore safe exceptions when unsubscribing
     new 8a7102e  CAMEL-17509: fix invalid topic info displayed when using topic patterns

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/kafka/KafkaFetchRecords.java   | 73 ++++++++++++++++------
 .../support/PartitionAssignmentListener.java       | 11 ++--
 2 files changed, 59 insertions(+), 25 deletions(-)

[camel] 02/03: CAMEL-17493: ignore safe exceptions when unsubscribing

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit a696417867d96b47c436e7405de67bdb647f7ded
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Fri Jan 14 19:25:43 2022 +0100

    CAMEL-17493: ignore safe exceptions when unsubscribing
---
 .../camel/component/kafka/KafkaFetchRecords.java      | 19 +++++++++++++++++--
 1 file changed, 17 insertions(+), 2 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index e1c5b03..f902f28 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -236,12 +236,27 @@ class KafkaFetchRecords implements Runnable {
     }
 
     private void safeUnsubscribe() {
+        final String printableTopic = getPrintableTopic();
+
         try {
             consumer.unsubscribe();
+        } catch (IllegalStateException e) {
+            LOG.warn("The consumer is likely already closed. Skipping the unsubscription from {}", printableTopic);
         } catch (Exception e) {
             kafkaConsumer.getExceptionHandler().handleException(
-                    "Error unsubscribing " + threadId + " from kafka topic " + topicName,
-                    e);
+                    "Error unsubscribing thread " + threadId + " from kafka " + printableTopic, e);
+        }
+    }
+
+    /*
+     * This is only used for presenting log messages that take into consideration that it might be subscribed to a topic
+     * or a topic pattern.
+     */
+    private String getPrintableTopic() {
+        if (topicPattern != null) {
+            return "topic pattern" + topicPattern;
+        } else {
+            return "topic" + topicName;
         }
     }
 

[camel] 01/03: CAMEL-17489: camel-kafka - Unsubscribe before closing the consumer

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 49af1a9ac2b1f0c114dd4b6af4a1246fb77186a9
Author: Rafał Gała <ra...@ing.pl>
AuthorDate: Fri Jan 14 12:22:29 2022 +0100

    CAMEL-17489: camel-kafka - Unsubscribe before closing the consumer
---
 .../main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2ebd8bf..e1c5b03 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -208,6 +208,7 @@ class KafkaFetchRecords implements Runnable {
             // only close if not retry
             if (!isRetrying()) {
                 LOG.debug("Closing consumer {}", threadId);
+                safeUnsubscribe();
                 IOHelper.close(consumer);
             }
         }

[camel] 03/03: CAMEL-17509: fix invalid topic info displayed when using topic patterns

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 8a7102e25f8f2f14eacde66328e1acb4f22a510b
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Jan 18 12:13:45 2022 +0100

    CAMEL-17509: fix invalid topic info displayed when using topic patterns
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 57 ++++++++++++++--------
 .../support/PartitionAssignmentListener.java       | 11 +++--
 2 files changed, 43 insertions(+), 25 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index f902f28..db42570 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -101,7 +101,10 @@ class KafkaFetchRecords implements Runnable {
             startPolling();
         } while ((isRetrying() || isReconnect()) && isKafkaConsumerRunnable());
 
-        LOG.info("Terminating KafkaConsumer thread: {} receiving from topic: {}", threadId, topicName);
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Terminating KafkaConsumer thread: {} receiving from {}", threadId, getPrintableTopic());
+        }
+
         safeUnsubscribe();
         IOHelper.close(consumer);
     }
@@ -138,14 +141,16 @@ class KafkaFetchRecords implements Runnable {
 
     private void subscribe() {
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
-                threadId, topicName,
-                kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, this::isRunnable);
+                threadId, kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset,
+                this::isRunnable);
+
+        if (LOG.isInfoEnabled()) {
+            LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
+        }
 
         if (topicPattern != null) {
-            LOG.info("Subscribing {} to topic pattern {}", threadId, topicName);
             consumer.subscribe(topicPattern, listener);
         } else {
-            LOG.info("Subscribing {} to topic {}", threadId, topicName);
             consumer.subscribe(Arrays.asList(topicName.split(",")), listener);
         }
     }
@@ -161,7 +166,10 @@ class KafkaFetchRecords implements Runnable {
             lock.lock();
 
             long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-            LOG.trace("Polling {} from topic: {} with timeout: {}", threadId, topicName, pollTimeoutMs);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
+            }
 
             KafkaRecordProcessor kafkaRecordProcessor = buildKafkaRecordProcessor();
 
@@ -185,20 +193,24 @@ class KafkaFetchRecords implements Runnable {
                     e);
             commit();
 
-            LOG.info("Unsubscribing {} from topic {}", threadId, topicName);
+            LOG.info("Unsubscribing {} from {}", threadId, getPrintableTopic());
             safeUnsubscribe();
             Thread.currentThread().interrupt();
         } catch (WakeupException e) {
             // This is normal: it raises this exception when calling the wakeUp (which happens when we stop)
-            LOG.trace("The kafka consumer was woken up while polling on thread {} for topic {}", threadId, topicName);
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("The kafka consumer was woken up while polling on thread {} for {}", threadId, getPrintableTopic());
+            }
+
             safeUnsubscribe();
         } catch (Exception e) {
             if (LOG.isDebugEnabled()) {
-                LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}",
-                        e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage(), e);
+                LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage(), e);
             } else {
-                LOG.warn("Exception {} caught while polling {} from kafka topic {} at offset {}: {}",
-                        e.getClass().getName(), threadId, topicName, lastProcessedOffset, e.getMessage());
+                LOG.warn("Exception {} caught while polling {} from kafka {} at offset {}: {}",
+                        e.getClass().getName(), threadId, getPrintableTopic(), lastProcessedOffset, e.getMessage());
             }
 
             handleAccordingToStrategy(partitionLastOffset, e);
@@ -254,9 +266,9 @@ class KafkaFetchRecords implements Runnable {
      */
     private String getPrintableTopic() {
         if (topicPattern != null) {
-            return "topic pattern" + topicPattern;
+            return "topic pattern " + topicPattern;
         } else {
-            return "topic" + topicName;
+            return "topic " + topicName;
         }
     }
 
@@ -264,13 +276,13 @@ class KafkaFetchRecords implements Runnable {
         processAsyncCommits();
         if (isAutoCommitEnabled()) {
             if ("async".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) {
-                LOG.info("Auto commitAsync on stop {} from topic {}", threadId, topicName);
+                LOG.info("Auto commitAsync on stop {} from {}", threadId, getPrintableTopic());
                 consumer.commitAsync();
             } else if ("sync".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) {
-                LOG.info("Auto commitSync on stop {} from topic {}", threadId, topicName);
+                LOG.info("Auto commitSync on stop {} from {}", threadId, getPrintableTopic());
                 consumer.commitSync();
             } else if ("none".equals(kafkaConsumer.getEndpoint().getConfiguration().getAutoCommitOnStop())) {
-                LOG.info("Auto commit on stop {} from topic {} is disabled (none)", threadId, topicName);
+                LOG.info("Auto commit on stop {} from {} is disabled (none)", threadId, getPrintableTopic());
             }
         }
     }
@@ -428,7 +440,12 @@ class KafkaFetchRecords implements Runnable {
         Set<TopicPartition> tps = consumer.assignment();
         if (tps != null && partitionLastOffset != -1) {
             long next = partitionLastOffset + 1;
-            LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", next, topicName);
+
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", next,
+                        getPrintableTopic());
+            }
+
             for (TopicPartition tp : tps) {
                 consumer.seek(tp, next);
             }
@@ -436,8 +453,8 @@ class KafkaFetchRecords implements Runnable {
             for (TopicPartition tp : tps) {
                 long next = consumer.position(tp) + 1;
                 if (!logged) {
-                    LOG.info("Consumer seeking to next offset {} to continue polling next message from topic: {}", next,
-                            topicName);
+                    LOG.info("Consumer seeking to next offset {} to continue polling next message from {}", next,
+                            getPrintableTopic());
                     logged = true;
                 }
                 consumer.seek(tp, next);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index 07d914a..51854b1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -34,18 +34,16 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
     private static final Logger LOG = LoggerFactory.getLogger(PartitionAssignmentListener.class);
 
     private final String threadId;
-    private final String topicName;
     private final KafkaConfiguration configuration;
     private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private Supplier<Boolean> stopStateSupplier;
 
-    public PartitionAssignmentListener(String threadId, String topicName, KafkaConfiguration configuration,
+    public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration,
                                        Consumer consumer, Map<String, Long> lastProcessedOffset,
                                        Supplier<Boolean> stopStateSupplier) {
         this.threadId = threadId;
-        this.topicName = topicName;
         this.configuration = configuration;
         this.consumer = consumer;
         this.lastProcessedOffset = lastProcessedOffset;
@@ -56,12 +54,13 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsRevoked: {} from topic {}", threadId, topicName);
 
         // if camel is stopping, or we are not running
         boolean stopping = stopStateSupplier.get();
 
         for (TopicPartition partition : partitions) {
+            LOG.debug("onPartitionsRevoked: {} from {}", threadId, partition.topic());
+
             String offsetKey = serializeOffsetKey(partition);
             Long offset = lastProcessedOffset.get(offsetKey);
             if (offset == null) {
@@ -84,7 +83,9 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener {
 
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-        LOG.debug("onPartitionsAssigned: {} from topic {}", threadId, topicName);
+        if (LOG.isDebugEnabled()) {
+            partitions.forEach(p -> LOG.debug("onPartitionsAssigned: {} from {}", threadId, p.topic()));
+        }
 
         resumeStrategy.resume(consumer);
     }