You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/09/15 12:49:36 UTC
nifi git commit: NIFI-5592: If an Exception is thrown by
RecordReader.read() from ConsumeKafkaRecord,
route Record to parse.failure relationship
Repository: nifi
Updated Branches:
refs/heads/master 5a84d650c -> c8fc1327e
NIFI-5592: If an Exception is thrown by RecordReader.read() from ConsumeKafkaRecord, route Record to parse.failure relationship
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #3001.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c8fc1327
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c8fc1327
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c8fc1327
Branch: refs/heads/master
Commit: c8fc1327eea046b03f47180334eeeb603bc4ea39
Parents: 5a84d65
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Sep 13 13:56:13 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Sat Sep 15 14:49:24 2018 +0200
----------------------------------------------------------------------
.../processors/kafka/pubsub/ConsumerLease.java | 134 ++++++++++---------
.../processors/kafka/pubsub/ConsumerLease.java | 134 ++++++++++---------
2 files changed, 140 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/c8fc1327/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 4d9a5b6..815ed18 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -16,30 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import javax.xml.bind.DatatypeConverter;
-
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -54,14 +30,39 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
@@ -530,51 +531,56 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
continue;
}
- Record record;
- while ((record = reader.nextRecord()) != null) {
- // Determine the bundle for this record.
- final RecordSchema recordSchema = record.getSchema();
- final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
-
- BundleTracker tracker = bundleMap.get(bundleInfo);
- if (tracker == null) {
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- final OutputStream rawOut = session.write(flowFile);
+ try {
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ // Determine the bundle for this record.
+ final RecordSchema recordSchema = record.getSchema();
+ final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+ if (tracker == null) {
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ final OutputStream rawOut = session.write(flowFile);
+
+ final RecordSchema writeSchema;
+ try {
+ writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+ rollback(topicPartition);
+ yield();
+
+ throw new ProcessException(e);
+ }
+
+ writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+ writer.beginRecordSet();
+
+ tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ } else {
+ writer = tracker.recordWriter;
+ }
- final RecordSchema writeSchema;
try {
- writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
- } catch (final Exception e) {
- logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
-
- rollback(topicPartition);
- yield();
-
- throw new ProcessException(e);
+ writer.write(record);
+ } catch (final RuntimeException re) {
+ handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ continue;
}
- writer = writerFactory.createWriter(logger, writeSchema, rawOut);
- writer.beginRecordSet();
-
- tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
- tracker.updateFlowFile(flowFile);
- bundleMap.put(bundleInfo, tracker);
- } else {
- writer = tracker.recordWriter;
- }
-
- try {
- writer.write(record);
- } catch (final RuntimeException re) {
- handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
- + "Will route message as its own FlowFile to the 'parse.failure' relationship");
- continue;
+ tracker.incrementRecordCount(1L);
+ session.adjustCounter("Records Received", 1L, false);
}
-
- tracker.incrementRecordCount(1L);
- session.adjustCounter("Records Received", 1L, false);
+ } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+ handleParseFailure(consumerRecord, session, e);
+ continue;
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/c8fc1327/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 2e7e2d4..ac88a4c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -16,30 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
-import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
-
-import java.io.ByteArrayInputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import javax.xml.bind.DatatypeConverter;
-
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -54,14 +30,39 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
+import javax.xml.bind.DatatypeConverter;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
@@ -530,51 +531,56 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
continue;
}
- Record record;
- while ((record = reader.nextRecord()) != null) {
- // Determine the bundle for this record.
- final RecordSchema recordSchema = record.getSchema();
- final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
-
- BundleTracker tracker = bundleMap.get(bundleInfo);
- if (tracker == null) {
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- final OutputStream rawOut = session.write(flowFile);
+ try {
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ // Determine the bundle for this record.
+ final RecordSchema recordSchema = record.getSchema();
+ final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+ if (tracker == null) {
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ final OutputStream rawOut = session.write(flowFile);
+
+ final RecordSchema writeSchema;
+ try {
+ writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+ rollback(topicPartition);
+ yield();
+
+ throw new ProcessException(e);
+ }
+
+ writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+ writer.beginRecordSet();
+
+ tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ } else {
+ writer = tracker.recordWriter;
+ }
- final RecordSchema writeSchema;
try {
- writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
- } catch (final Exception e) {
- logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
-
- rollback(topicPartition);
- yield();
-
- throw new ProcessException(e);
+ writer.write(record);
+ } catch (final RuntimeException re) {
+ handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ continue;
}
- writer = writerFactory.createWriter(logger, writeSchema, rawOut);
- writer.beginRecordSet();
-
- tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
- tracker.updateFlowFile(flowFile);
- bundleMap.put(bundleInfo, tracker);
- } else {
- writer = tracker.recordWriter;
- }
-
- try {
- writer.write(record);
- } catch (final RuntimeException re) {
- handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
- + "Will route message as its own FlowFile to the 'parse.failure' relationship");
- continue;
+ tracker.incrementRecordCount(1L);
+ session.adjustCounter("Records Received", 1L, false);
}
-
- tracker.incrementRecordCount(1L);
- session.adjustCounter("Records Received", 1L, false);
+ } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
+ handleParseFailure(consumerRecord, session, e);
+ continue;
}
}
}