You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/03/08 11:42:48 UTC

[nifi] branch main updated: NIFI-9771: When a Kafka record is obtained during config verification, we should produce an invalid response if the Record Reader is not able to produce any records from it

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e6229ab  NIFI-9771: When a Kafka record is obtained during config verification, we should produce an invalid response if the Record Reader is not able to produce any records from it
e6229ab is described below

commit e6229ab938571f5339805797b6a6ffeebcfad652
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Mar 7 11:54:37 2022 -0500

    NIFI-9771: When a Kafka record is obtained during config verification, we should produce an invalid response if the Record Reader is not able to produce any records from it
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5847.
---
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 57 +++++++++++++---------
 1 file changed, 33 insertions(+), 24 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 6c4cc49..e8e0311 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -24,9 +24,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.serialization.RecordReader;
@@ -403,52 +403,61 @@ public class ConsumerPool implements Closeable {
         final Map<String, Integer> recordsPerTopic = new HashMap<>();
 
         for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
-            recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
             final Map<String, String> attributes = consumerLease.getAttributes(consumerRecord);
 
+            int numRecords = 0;
             final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value();
             try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
                 final RecordReader reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
                 while (reader.nextRecord() != null) {
+                    numRecords++;
                 }
             } catch (final Exception e) {
                 parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
                 latestParseFailureDescription.put(consumerRecord.topic(), e.toString());
             }
+
+            if (numRecords == 0) {
+                parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
+                latestParseFailureDescription.put(consumerRecord.topic(), "Received Kafka message but Record Reader produced no Record from it");
+                recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
+            } else {
+                recordsPerTopic.merge(consumerRecord.topic(), numRecords, Integer::sum);
+            }
         }
 
         // Note here that we do not commit the offsets. We will just let the consumer close without committing the offsets, which
         // will roll back the consumption of the messages.
-        if (recordsPerTopic.isEmpty()) {
-            return new ConfigVerificationResult.Builder()
-                .verificationStepName("Parse Records")
-                .outcome(Outcome.SKIPPED)
-                .explanation("Received no messages to attempt parsing within the 30 second timeout")
-                .build();
-        }
-
         if (parseFailuresPerTopic.isEmpty()) {
+            if (recordsPerTopic.isEmpty()) {
+                return new ConfigVerificationResult.Builder()
+                    .verificationStepName("Parse Records")
+                    .outcome(Outcome.SKIPPED)
+                    .explanation("Received no messages to attempt parsing within the 30 second timeout")
+                    .build();
+            }
+
             return new ConfigVerificationResult.Builder()
                 .verificationStepName("Parse Records")
                 .outcome(Outcome.SUCCESSFUL)
                 .explanation("Was able to parse all Records consumed from topics. Number of Records consumed from each topic: " + recordsPerTopic)
                 .build();
-        } else {
-            final Map<String, String> failureDescriptions = new HashMap<>();
-            for (final String topic : recordsPerTopic.keySet()) {
-                final int records = recordsPerTopic.get(topic);
-                final Integer failures = parseFailuresPerTopic.get(topic);
-                final String failureReason = latestParseFailureDescription.get(topic);
-                final String description = "Failed to parse " + failures + " out of " + records + " records. Sample failure reason: " + failureReason;
-                failureDescriptions.put(topic, description);
-            }
+        }
 
-            return new ConfigVerificationResult.Builder()
-                .verificationStepName("Parse Records")
-                .outcome(Outcome.FAILED)
-                .explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + failureDescriptions)
-                .build();
+        final Map<String, String> failureDescriptions = new HashMap<>();
+        for (final String topic : recordsPerTopic.keySet()) {
+            final int records = recordsPerTopic.get(topic);
+            final Integer failures = parseFailuresPerTopic.get(topic);
+            final String failureReason = latestParseFailureDescription.get(topic);
+            final String description = "Failed to parse " + failures + " out of " + records + " records. Sample failure reason: " + failureReason;
+            failureDescriptions.put(topic, description);
         }
+
+        return new ConfigVerificationResult.Builder()
+            .verificationStepName("Parse Records")
+            .outcome(Outcome.FAILED)
+            .explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + failureDescriptions)
+            .build();
     }
 
     /**