You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/04/28 19:29:58 UTC

[camel] branch main updated (33807af8f58 -> 944e65b0ae2)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 33807af8f58 CAMEL-18020: fix a possible NPE due to uninitialized consumer in poll exception strategies
     new 6ccf9cc2723 (chores) camel-kafka: use final fields where possible
     new 944e65b0ae2 (chores) camel-kafka: ensures all writes to the last error happen through the setter

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/kafka/KafkaFetchRecords.java    | 14 +++++++++-----
 .../org/apache/camel/component/kafka/KafkaProducer.java    |  4 ++--
 .../consumer/errorhandler/ReconnectErrorStrategy.java      |  2 +-
 .../kafka/consumer/errorhandler/StopErrorStrategy.java     |  2 +-
 .../component/kafka/consumer/support/KafkaResumable.java   |  2 +-
 .../resume/kafka/AbstractKafkaResumeStrategy.java          |  4 ++--
 6 files changed, 16 insertions(+), 12 deletions(-)


[camel] 01/02: (chores) camel-kafka: use final fields where possible

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6ccf9cc272359491388cd6f704e1300a12b6ccf1
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 15:57:48 2022 +0200

    (chores) camel-kafka: use final fields where possible
---
 .../src/main/java/org/apache/camel/component/kafka/KafkaProducer.java | 4 ++--
 .../component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java | 2 +-
 .../component/kafka/consumer/errorhandler/StopErrorStrategy.java      | 2 +-
 .../apache/camel/component/kafka/consumer/support/KafkaResumable.java | 2 +-
 .../camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java     | 4 ++--
 5 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 88f1f0a7e81..bcdf40e406e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -520,8 +520,8 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
 class KafkaTransactionSynchronization extends SynchronizationAdapter {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTransactionSynchronization.class);
-    private String transactionId;
-    private Producer kafkaProducer;
+    private final String transactionId;
+    private final Producer kafkaProducer;
 
     public KafkaTransactionSynchronization(String transactionId, Producer kafkaProducer) {
         this.transactionId = transactionId;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
index 0d54cbe0706..77036a9aa07 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/ReconnectErrorStrategy.java
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
 
 public class ReconnectErrorStrategy implements PollExceptionStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(ReconnectErrorStrategy.class);
-    private KafkaFetchRecords recordFetcher;
+    private final KafkaFetchRecords recordFetcher;
 
     private boolean retry = true;
 
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
index 6258ad3ac63..2e649614c22 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/errorhandler/StopErrorStrategy.java
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory;
 
 public class StopErrorStrategy implements PollExceptionStrategy {
     private static final Logger LOG = LoggerFactory.getLogger(StopErrorStrategy.class);
-    private KafkaFetchRecords recordFetcher;
+    private final KafkaFetchRecords recordFetcher;
     private boolean retry = true;
 
     public StopErrorStrategy(KafkaFetchRecords recordFetcher) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
index a210c475bec..388f47fa3e5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
@@ -22,7 +22,7 @@ import org.apache.camel.Resumable;
 import org.apache.camel.resume.Offsets;
 
 public class KafkaResumable implements Resumable<String, String> {
-    private String partition;
+    private final String partition;
     private String offset;
 
     public KafkaResumable(String partition, String offset) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
index 80825eeb76d..9960cecccf4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -66,8 +66,8 @@ public abstract class AbstractKafkaResumeStrategy<K, V>
     private final List<Future<RecordMetadata>> sentItems = new ArrayList<>();
     private final ResumeCache<K, V> resumeCache;
     private boolean subscribed;
-    private Properties producerConfig;
-    private Properties consumerConfig;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
 
     public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) {
         this.topic = topic;


[camel] 02/02: (chores) camel-kafka: ensures all writes to the last error happen through the setter

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 944e65b0ae245e157aa5df2405edaf57c918d63a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Apr 28 16:23:20 2022 +0200

    (chores) camel-kafka: ensures all writes to the last error happen through the setter
---
 .../apache/camel/component/kafka/KafkaFetchRecords.java    | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 0f090a34d41..b1befdfa5c3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -154,7 +154,7 @@ public class KafkaFetchRecords implements Runnable {
                 setConnected(true);
             }
 
-            lastError = null;
+            setLastError(null);
             startPolling();
         } while ((pollExceptionStrategy.canContinue() || isReconnect()) && isKafkaConsumerRunnable());
 
@@ -172,7 +172,7 @@ public class KafkaFetchRecords implements Runnable {
         String msg = "Gave up subscribing org.apache.kafka.clients.consumer.KafkaConsumer " +
                      threadId + " to " + topic + " after " + max + " attempts (elapsed: " + time + ").";
         LOG.warn(msg);
-        lastError = new KafkaConsumerFatalException(msg, lastError);
+        setLastError(new KafkaConsumerFatalException(msg, lastError));
     }
 
     private void setupCreateConsumerException(ForegroundTask task, int max) {
@@ -180,7 +180,8 @@ public class KafkaFetchRecords implements Runnable {
         String topic = getPrintableTopic();
         String msg = "Gave up creating org.apache.kafka.clients.consumer.KafkaConsumer "
                      + threadId + " to " + topic + " after " + max + " attempts (elapsed: " + time + ").";
-        lastError = new KafkaConsumerFatalException(msg, lastError);
+
+        setLastError(new KafkaConsumerFatalException(msg, lastError));
     }
 
     private boolean initializeConsumerTask() {
@@ -191,7 +192,7 @@ public class KafkaFetchRecords implements Runnable {
             // ensure this is logged so users can see the problem
             LOG.warn("Error subscribing org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(),
                     e);
-            lastError = e;
+            setLastError(e);
             return false;
         }
 
@@ -219,7 +220,7 @@ public class KafkaFetchRecords implements Runnable {
             // ensure this is logged so users can see the problem
             LOG.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer due to: {}", e.getMessage(),
                     e);
-            lastError = e;
+            setLastError(e);
             return false;
         }
 
@@ -551,4 +552,7 @@ public class KafkaFetchRecords implements Runnable {
         state = State.RESUME_REQUESTED;
     }
 
+    private synchronized void setLastError(Exception lastError) {
+        this.lastError = lastError;
+    }
 }