You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/11/01 16:18:25 UTC

(nifi) branch main updated: NIFI-12194 Added Yield on Exceptions in Kafka Processors

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 75c661bbbe NIFI-12194 Added Yield on Exceptions in Kafka Processors
75c661bbbe is described below

commit 75c661bbbe56a7951974a701921af9da74dd0d68
Author: Paul Grey <gr...@yahoo.com>
AuthorDate: Mon Oct 30 15:15:52 2023 -0400

    NIFI-12194 Added Yield on Exceptions in Kafka Processors
    
    - Catching KafkaException and yielding for publisher lease requests improves behavior when the Processor is unable to connect to Kafka Brokers
    
    This closes #7955
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java     |  6 ++++--
 .../nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java      |  6 ++++--
 .../processors/kafka/pubsub/PublishKafkaRecord_2_6.java     | 13 ++++++++++++-
 .../nifi/processors/kafka/pubsub/PublishKafka_2_6.java      | 13 ++++++++++++-
 4 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 525f621e1f..50fece3b35 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -540,9 +540,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
                 getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
                     + "Will roll back session and discard any partially received data.", lease);
             } catch (final KafkaException kex) {
-                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
+                getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex);
+                context.yield();
             } catch (final Throwable t) {
-                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
+                getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t);
+                context.yield();
             } finally {
                 activeLeases.remove(lease);
             }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 4421ae92f8..a5c6b15891 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -483,9 +483,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
                 getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
                     + "Will roll back session and discard any partially received data.", lease);
             } catch (final KafkaException kex) {
-                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}", lease, kex, kex);
+                getLogger().error("Exception while interacting with Kafka so will close the lease {}", lease, kex);
+                context.yield();
             } catch (final Throwable t) {
-                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}", lease, t, t);
+                getLogger().error("Exception while processing data from kafka so will close the lease {}", lease, t);
+                context.yield();
             } finally {
                 activeLeases.remove(lease);
             }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index af61faeb95..34053d6a3b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -18,6 +18,7 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -505,7 +506,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
         }
 
         final long startTime = System.nanoTime();
-        try (final PublisherLease lease = pool.obtainPublisher()) {
+        try (final PublisherLease lease = obtainPublisher(context, pool)) {
             try {
                 if (useTransactions) {
                     lease.beginTransaction();
@@ -588,6 +589,16 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements KafkaPu
         }
     }
 
+    private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) {
+        try {
+            return pool.obtainPublisher();
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka Producer", e);
+            context.yield();
+            throw e;
+        }
+    }
+
     private Function<Record, Integer> getPartitioner(final ProcessContext context, final FlowFile flowFile) {
         final String partitionClass = context.getProperty(PARTITION_CLASS).getValue();
         if (RECORD_PATH_PARTITIONING.getValue().equals(partitionClass)) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index b2721a7199..b6b84ce1e0 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -439,7 +440,7 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
         final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
-        try (final PublisherLease lease = pool.obtainPublisher()) {
+        try (final PublisherLease lease = obtainPublisher(context, pool)) {
             try {
                 if (useTransactions) {
                     lease.beginTransaction();
@@ -512,6 +513,16 @@ public class PublishKafka_2_6 extends AbstractProcessor implements KafkaPublishC
         }
     }
 
+    private PublisherLease obtainPublisher(final ProcessContext context, final PublisherPool pool) {
+        try {
+            return pool.obtainPublisher();
+        } catch (final KafkaException e) {
+            getLogger().error("Failed to obtain Kafka Producer", e);
+            context.yield();
+            throw e;
+        }
+    }
+
     private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
         final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
         if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {