You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/06/30 08:57:05 UTC
nifi git commit: NIFI-4046: If we are unable to parse out any records
from a Kafka Mesaage with ConsumeKafkaRecord,
then we should route all of the bytes received to 'parse.failure'
Repository: nifi
Updated Branches:
refs/heads/master 58a623dfa -> cdc154f7c
NIFI-4046: If we are unable to parse out any records from a Kafka Mesaage with ConsumeKafkaRecord, then we should route all of the bytes received to 'parse.failure'
NIFI-4046: Addressed issue of Record Writer failing with ConsumeKafkaRecord
This closes #1906.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cdc154f7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cdc154f7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cdc154f7
Branch: refs/heads/master
Commit: cdc154f7c84a0e5e28d30cc8ee0d49ee5f8892ce
Parents: 58a623d
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jun 9 09:02:08 2017 -0400
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Jun 30 17:56:14 2017 +0900
----------------------------------------------------------------------
.../processors/kafka/pubsub/ConsumerLease.java | 74 ++++++++++++++------
1 file changed, 53 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc154f7/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 242c917..9b24bc7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -347,13 +347,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private Collection<FlowFile> getBundles() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
- processBundle(tracker);
- flowFiles.add(tracker.flowFile);
+ final boolean includeBundle = processBundle(tracker);
+ if (includeBundle) {
+ flowFiles.add(tracker.flowFile);
+ }
}
return flowFiles;
}
- private void processBundle(final BundleTracker bundle) throws IOException {
+ private boolean processBundle(final BundleTracker bundle) throws IOException {
final RecordSetWriter writer = bundle.recordWriter;
if (writer != null) {
final WriteResult writeResult;
@@ -364,6 +366,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
writer.close();
}
+ if (writeResult.getRecordCount() == 0) {
+ getProcessSession().remove(bundle.flowFile);
+ return false;
+ }
+
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@@ -372,6 +379,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
populateAttributes(bundle);
+ return true;
}
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
@@ -417,6 +425,35 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
bundleMap.put(bundleInfo, tracker);
}
+ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
+ handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ }
+
+ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
+ // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+
+ FlowFile failureFlowFile = session.create();
+ failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+ failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+ session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+ session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+ if (cause == null) {
+ logger.error(message);
+ } else {
+ logger.error(message, cause);
+ }
+
+ session.adjustCounter("Parse Failures", 1, false);
+ }
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
@@ -433,24 +470,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
record = reader.nextRecord();
} catch (final Exception e) {
- // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
- attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
- attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
-
- FlowFile failureFlowFile = session.create();
- failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
- failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
-
- final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
- session.getProvenanceReporter().receive(failureFlowFile, transitUri);
-
- session.transfer(failureFlowFile, REL_PARSE_FAILURE);
- logger.error("Failed to parse message from Kafka using the configured Record Reader. "
- + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
+ handleParseFailure(consumerRecord, session, e);
+ continue;
+ }
- session.adjustCounter("Parse Failures", 1, false);
+ if (record == null) {
+ handleParseFailure(consumerRecord, session, null);
continue;
}
@@ -489,7 +514,14 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
writer = tracker.recordWriter;
}
- writer.write(record);
+ try {
+ writer.write(record);
+ } catch (final RuntimeException re) {
+ handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ continue;
+ }
+
tracker.incrementRecordCount(1L);
}