You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/09/15 12:49:36 UTC

nifi git commit: NIFI-5592: If an Exception is thrown by RecordReader.read() from ConsumeKafkaRecord, route Record to parse.failure relationship

Repository: nifi
Updated Branches:
  refs/heads/master 5a84d650c -> c8fc1327e


NIFI-5592: If an Exception is thrown by RecordReader.read() from ConsumeKafkaRecord, route Record to parse.failure relationship

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #3001.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c8fc1327
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c8fc1327
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c8fc1327

Branch: refs/heads/master
Commit: c8fc1327eea046b03f47180334eeeb603bc4ea39
Parents: 5a84d65
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Sep 13 13:56:13 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Sat Sep 15 14:49:24 2018 +0200

----------------------------------------------------------------------
 .../processors/kafka/pubsub/ConsumerLease.java  | 134 ++++++++++---------
 .../processors/kafka/pubsub/ConsumerLease.java  | 134 ++++++++++---------
 2 files changed, 140 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c8fc1327/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 4d9a5b6..815ed18 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -16,30 +16,6 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import javax.xml.bind.DatatypeConverter;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -54,14 +30,39 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
  * intended to be obtained from a ConsumerPool. The lease is closeable to allow
@@ -530,51 +531,56 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         continue;
                     }
 
-                    Record record;
-                    while ((record = reader.nextRecord()) != null) {
-                        // Determine the bundle for this record.
-                        final RecordSchema recordSchema = record.getSchema();
-                        final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
-
-                        BundleTracker tracker = bundleMap.get(bundleInfo);
-                        if (tracker == null) {
-                            FlowFile flowFile = session.create();
-                            flowFile = session.putAllAttributes(flowFile, attributes);
-
-                            final OutputStream rawOut = session.write(flowFile);
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+                            // Determine the bundle for this record.
+                            final RecordSchema recordSchema = record.getSchema();
+                            final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+                            BundleTracker tracker = bundleMap.get(bundleInfo);
+                            if (tracker == null) {
+                                FlowFile flowFile = session.create();
+                                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                final RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+                                    rollback(topicPartition);
+                                    yield();
+
+                                    throw new ProcessException(e);
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+                                writer.beginRecordSet();
+
+                                tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+                                tracker.updateFlowFile(flowFile);
+                                bundleMap.put(bundleInfo, tracker);
+                            } else {
+                                writer = tracker.recordWriter;
+                            }
 
-                            final RecordSchema writeSchema;
                             try {
-                                writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
-                            } catch (final Exception e) {
-                                logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
-
-                                rollback(topicPartition);
-                                yield();
-
-                                throw new ProcessException(e);
+                                writer.write(record);
+                            } catch (final RuntimeException re) {
+                                handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+                                    + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+                                continue;
                             }
 
-                            writer = writerFactory.createWriter(logger, writeSchema, rawOut);
-                            writer.beginRecordSet();
-
-                            tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
-                            tracker.updateFlowFile(flowFile);
-                            bundleMap.put(bundleInfo, tracker);
-                        } else {
-                            writer = tracker.recordWriter;
-                        }
-
-                        try {
-                            writer.write(record);
-                        } catch (final RuntimeException re) {
-                            handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
-                                + "Will route message as its own FlowFile to the 'parse.failure' relationship");
-                            continue;
+                            tracker.incrementRecordCount(1L);
+                            session.adjustCounter("Records Received", 1L, false);
                         }
-
-                        tracker.incrementRecordCount(1L);
-                        session.adjustCounter("Records Received", 1L, false);
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        handleParseFailure(consumerRecord, session, e);
+                        continue;
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/c8fc1327/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 2e7e2d4..ac88a4c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -16,30 +16,6 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import javax.xml.bind.DatatypeConverter;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -54,14 +30,39 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
  * intended to be obtained from a ConsumerPool. The lease is closeable to allow
@@ -530,51 +531,56 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         continue;
                     }
 
-                    Record record;
-                    while ((record = reader.nextRecord()) != null) {
-                        // Determine the bundle for this record.
-                        final RecordSchema recordSchema = record.getSchema();
-                        final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
-
-                        BundleTracker tracker = bundleMap.get(bundleInfo);
-                        if (tracker == null) {
-                            FlowFile flowFile = session.create();
-                            flowFile = session.putAllAttributes(flowFile, attributes);
-
-                            final OutputStream rawOut = session.write(flowFile);
+                    try {
+                        Record record;
+                        while ((record = reader.nextRecord()) != null) {
+                            // Determine the bundle for this record.
+                            final RecordSchema recordSchema = record.getSchema();
+                            final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+                            BundleTracker tracker = bundleMap.get(bundleInfo);
+                            if (tracker == null) {
+                                FlowFile flowFile = session.create();
+                                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                                final OutputStream rawOut = session.write(flowFile);
+
+                                final RecordSchema writeSchema;
+                                try {
+                                    writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                                } catch (final Exception e) {
+                                    logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+                                    rollback(topicPartition);
+                                    yield();
+
+                                    throw new ProcessException(e);
+                                }
+
+                                writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+                                writer.beginRecordSet();
+
+                                tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+                                tracker.updateFlowFile(flowFile);
+                                bundleMap.put(bundleInfo, tracker);
+                            } else {
+                                writer = tracker.recordWriter;
+                            }
 
-                            final RecordSchema writeSchema;
                             try {
-                                writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
-                            } catch (final Exception e) {
-                                logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
-
-                                rollback(topicPartition);
-                                yield();
-
-                                throw new ProcessException(e);
+                                writer.write(record);
+                            } catch (final RuntimeException re) {
+                                handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+                                    + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+                                continue;
                             }
 
-                            writer = writerFactory.createWriter(logger, writeSchema, rawOut);
-                            writer.beginRecordSet();
-
-                            tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
-                            tracker.updateFlowFile(flowFile);
-                            bundleMap.put(bundleInfo, tracker);
-                        } else {
-                            writer = tracker.recordWriter;
-                        }
-
-                        try {
-                            writer.write(record);
-                        } catch (final RuntimeException re) {
-                            handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
-                                + "Will route message as its own FlowFile to the 'parse.failure' relationship");
-                            continue;
+                            tracker.incrementRecordCount(1L);
+                            session.adjustCounter("Records Received", 1L, false);
                         }
-
-                        tracker.incrementRecordCount(1L);
-                        session.adjustCounter("Records Received", 1L, false);
+                    } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+                        handleParseFailure(consumerRecord, session, e);
+                        continue;
                     }
                 }
             }