You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/24 03:20:31 UTC
nifi git commit: NIFI-3962: This closes #1846. Updated ConsumerLease
to better handle batching of messages into a single FlowFile in the same way
that it is handled for demarcated data
Repository: nifi
Updated Branches:
refs/heads/master fb925fc18 -> 2c751a8e5
NIFI-3962: This closes #1846. Updated ConsumerLease to better handle batching of messages into a single FlowFile in the same way that it is handled for demarcated data
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2c751a8e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2c751a8e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2c751a8e
Branch: refs/heads/master
Commit: 2c751a8e5bdf784513fabf1da029bb12cd076732
Parents: fb925fc
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue May 23 15:08:37 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue May 23 23:20:13 2017 -0400
----------------------------------------------------------------------
.../processors/kafka/pubsub/ConsumerLease.java | 190 +++++++++++--------
1 file changed, 113 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2c751a8e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index effd2e4..ee6b1ff 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -23,15 +23,16 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
import java.io.ByteArrayInputStream;
import java.io.Closeable;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.DatatypeConverter;
@@ -56,7 +57,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.Tuple;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@@ -79,11 +79,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
- private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+ private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
private long leaseStartNanos = -1;
private boolean lastPollEmpty = false;
- private int totalFlowFiles = 0;
+ private int totalMessages = 0;
ConsumerLease(
final long maxWaitMillis,
@@ -115,7 +115,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
uncommittedOffsetsMap.clear();
leaseStartNanos = -1;
lastPollEmpty = false;
- totalFlowFiles = 0;
+ totalMessages = 0;
}
/**
@@ -206,6 +206,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
kafkaConsumer.commitSync(offsetsMap);
resetInternalState();
return true;
+ } catch (final IOException ioe) {
+ poison();
+ logger.error("Failed to finish writing out FlowFile bundle", ioe);
+ throw new ProcessException(ioe);
} catch (final KafkaException kex) {
poison();
logger.warn("Duplicates are likely as we were able to commit the process"
@@ -253,7 +257,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
return false;
} else {
- return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+ return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
}
}
@@ -315,12 +319,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} else if (readerFactory != null && writerFactory != null) {
writeRecordData(getProcessSession(), messages, partition);
} else {
- totalFlowFiles += messages.size();
messages.stream().forEach(message -> {
writeData(getProcessSession(), message, partition);
});
}
+ totalMessages += messages.size();
uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
}
});
@@ -340,15 +344,36 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
}
- private Collection<FlowFile> getBundles() {
+ private Collection<FlowFile> getBundles() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
- populateAttributes(tracker);
+ processBundle(tracker);
flowFiles.add(tracker.flowFile);
}
return flowFiles;
}
+ private void processBundle(final BundleTracker bundle) throws IOException {
+ final RecordSetWriter writer = bundle.recordWriter;
+ if (writer != null) {
+ final WriteResult writeResult;
+
+ try {
+ writeResult = writer.finishRecordSet();
+ } finally {
+ writer.close();
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.putAll(writeResult.getAttributes());
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+ bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
+ }
+
+ populateAttributes(bundle);
+ }
+
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
FlowFile flowFile = session.create();
final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
@@ -364,7 +389,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final boolean demarcateFirstRecord;
- BundleTracker tracker = bundleMap.get(topicPartition);
+ final BundleInformation bundleInfo = new BundleInformation(topicPartition, null);
+ BundleTracker tracker = bundleMap.get(bundleInfo);
FlowFile flowFile;
if (tracker == null) {
tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
@@ -388,39 +414,22 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
});
tracker.updateFlowFile(flowFile);
- bundleMap.put(topicPartition, tracker);
+ bundleMap.put(bundleInfo, tracker);
}
- private void rollback(final TopicPartition topicPartition) {
- OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
- if (offsetAndMetadata == null) {
- offsetAndMetadata = kafkaConsumer.committed(topicPartition);
- }
-
- final long offset = offsetAndMetadata.offset();
- kafkaConsumer.seek(topicPartition, offset);
- }
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
- if (records.isEmpty()) {
- return;
- }
-
- final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
-
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
// We don't want to create a new FlowFile for each record that we receive, so we will just create
// a "temporary flowfile" that will be removed in the finally block below and use that to pass to
// the createRecordReader method.
final FlowFile tempFlowFile = session.create();
- try {
- final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
- while (itr.hasNext()) {
- final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
- final InputStream in = new ByteArrayInputStream(consumerRecord.value());
+ RecordSetWriter writer = null;
+ try {
+ for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
final Record record;
- try {
+ try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
record = reader.nextRecord();
} catch (final Exception e) {
@@ -445,10 +454,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
continue;
}
+ // Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
+ final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema);
- Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema);
- if (tuple == null) {
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+ if (tracker == null) {
FlowFile flowFile = session.create();
final OutputStream rawOut = session.write(flowFile);
@@ -468,20 +479,33 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
- final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
+ writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
writer.beginRecordSet();
- tuple = new Tuple<>(flowFile, writer);
- writers.put(recordSchema, tuple);
+ tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ } else {
+ writer = tracker.recordWriter;
}
- final RecordSetWriter writer = tuple.getValue();
writer.write(record);
+ tracker.incrementRecordCount(1L);
}
+
+ session.adjustCounter("Records Received", records.size(), false);
} catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
try {
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (final Exception ioe) {
+ logger.warn("Failed to close Record Writer", ioe);
+ }
+
+ try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
@@ -491,50 +515,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} finally {
session.remove(tempFlowFile);
}
+ }
- for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
- FlowFile flowFile = tuple.getKey();
- final RecordSetWriter writer = tuple.getValue();
-
- final WriteResult writeResult;
- try {
- writeResult = writer.finishRecordSet();
- writer.close();
- } catch (final Exception e) {
- logger.error("Failed to finish writing records to Content Repository", e);
- try {
- rollback(topicPartition);
- } catch (final Exception rollbackException) {
- logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
- }
- throw new ProcessException(e);
- }
-
- final int recordCount = writeResult.getRecordCount();
- if (recordCount > 0) {
- final Map<String, String> attributes = new HashMap<>();
- attributes.putAll(writeResult.getAttributes());
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
- attributes.put("record.count", String.valueOf(recordCount));
-
- attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
- attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
-
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
- final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
- session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
- session.adjustCounter("Records Received", recordCount, false);
- session.transfer(flowFile, REL_SUCCESS);
- } else {
- session.remove(flowFile);
- }
+ private void rollback(final TopicPartition topicPartition) {
+ OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
+ if (offsetAndMetadata == null) {
+ offsetAndMetadata = kafkaConsumer.committed(topicPartition);
}
+
+ final long offset = offsetAndMetadata.offset();
+ kafkaConsumer.seek(topicPartition, offset);
}
+
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
@@ -544,7 +539,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+ // Add a record.count attribute to remain consistent with other record-oriented processors. If not
+ // reading/writing records, then use "kafka.count" attribute.
+ if (tracker.recordWriter == null) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+ } else {
+ kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
+ }
}
final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
@@ -559,13 +560,19 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final int partition;
final String topic;
final String key;
+ final RecordSetWriter recordWriter;
FlowFile flowFile;
long totalRecords = 0;
private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+ this(initialRecord, topicPartition, keyEncoding, null);
+ }
+
+ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
this.initialOffset = initialRecord.offset();
this.partition = topicPartition.partition();
this.topic = topicPartition.topic();
+ this.recordWriter = recordWriter;
this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
}
@@ -579,4 +586,33 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
+ private static class BundleInformation {
+ private final TopicPartition topicPartition;
+ private final RecordSchema schema;
+
+ public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema) {
+ this.topicPartition = topicPartition;
+ this.schema = schema;
+ }
+
+ @Override
+ public int hashCode() {
+ return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof BundleInformation)) {
+ return false;
+ }
+ final BundleInformation other = (BundleInformation) obj;
+ return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema);
+ }
+ }
}