You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/06/30 08:57:05 UTC

nifi git commit: NIFI-4046: If we are unable to parse out any records from a Kafka Mesaage with ConsumeKafkaRecord, then we should route all of the bytes received to 'parse.failure'

Repository: nifi
Updated Branches:
  refs/heads/master 58a623dfa -> cdc154f7c


NIFI-4046: If we are unable to parse out any records from a Kafka Mesaage with ConsumeKafkaRecord, then we should route all of the bytes received to 'parse.failure'

NIFI-4046: Addressed issue of Record Writer failing with ConsumeKafkaRecord

This closes #1906.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cdc154f7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cdc154f7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cdc154f7

Branch: refs/heads/master
Commit: cdc154f7c84a0e5e28d30cc8ee0d49ee5f8892ce
Parents: 58a623d
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jun 9 09:02:08 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Jun 30 17:56:14 2017 +0900

----------------------------------------------------------------------
 .../processors/kafka/pubsub/ConsumerLease.java  | 74 ++++++++++++++------
 1 file changed, 53 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc154f7/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 242c917..9b24bc7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -347,13 +347,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private Collection<FlowFile> getBundles() throws IOException {
         final List<FlowFile> flowFiles = new ArrayList<>();
         for (final BundleTracker tracker : bundleMap.values()) {
-            processBundle(tracker);
-            flowFiles.add(tracker.flowFile);
+            final boolean includeBundle = processBundle(tracker);
+            if (includeBundle) {
+                flowFiles.add(tracker.flowFile);
+            }
         }
         return flowFiles;
     }
 
-    private void processBundle(final BundleTracker bundle) throws IOException {
+    private boolean processBundle(final BundleTracker bundle) throws IOException {
         final RecordSetWriter writer = bundle.recordWriter;
         if (writer != null) {
             final WriteResult writeResult;
@@ -364,6 +366,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                 writer.close();
             }
 
+            if (writeResult.getRecordCount() == 0) {
+                getProcessSession().remove(bundle.flowFile);
+                return false;
+            }
+
             final Map<String, String> attributes = new HashMap<>();
             attributes.putAll(writeResult.getAttributes());
             attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@@ -372,6 +379,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         }
 
         populateAttributes(bundle);
+        return true;
     }
 
     private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
@@ -417,6 +425,35 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         bundleMap.put(bundleInfo, tracker);
     }
 
+    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
+        handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+            + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+    }
+
+    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
+        // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+
+        FlowFile failureFlowFile = session.create();
+        failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+        if (cause == null) {
+            logger.error(message);
+        } else {
+            logger.error(message, cause);
+        }
+
+        session.adjustCounter("Parse Failures", 1, false);
+    }
 
     private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
         // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
@@ -433,24 +470,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                     final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
                     record = reader.nextRecord();
                 } catch (final Exception e) {
-                    // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
-                    final Map<String, String> attributes = new HashMap<>();
-                    attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
-                    attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
-                    attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
-
-                    FlowFile failureFlowFile = session.create();
-                    failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
-                    failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
-
-                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
-                    session.getProvenanceReporter().receive(failureFlowFile, transitUri);
-
-                    session.transfer(failureFlowFile, REL_PARSE_FAILURE);
-                    logger.error("Failed to parse message from Kafka using the configured Record Reader. "
-                        + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
+                    handleParseFailure(consumerRecord, session, e);
+                    continue;
+                }
 
-                    session.adjustCounter("Parse Failures", 1, false);
+                if (record == null) {
+                    handleParseFailure(consumerRecord, session, null);
                     continue;
                 }
 
@@ -489,7 +514,14 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                     writer = tracker.recordWriter;
                 }
 
-                writer.write(record);
+                try {
+                    writer.write(record);
+                } catch (final RuntimeException re) {
+                    handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+                        + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+                    continue;
+                }
+
                 tracker.incrementRecordCount(1L);
             }