You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/24 03:20:31 UTC

nifi git commit: NIFI-3962: This closes #1846. Updated ConsumerLease to better handle batching of messages into a single FlowFile in the same way that it is handled for demarcated data

Repository: nifi
Updated Branches:
  refs/heads/master fb925fc18 -> 2c751a8e5


NIFI-3962: This closes #1846. Updated ConsumerLease to better handle batching of messages into a single FlowFile in the same way that it is handled for demarcated data


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2c751a8e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2c751a8e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2c751a8e

Branch: refs/heads/master
Commit: 2c751a8e5bdf784513fabf1da029bb12cd076732
Parents: fb925fc
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue May 23 15:08:37 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue May 23 23:20:13 2017 -0400

----------------------------------------------------------------------
 .../processors/kafka/pubsub/ConsumerLease.java  | 190 +++++++++++--------
 1 file changed, 113 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2c751a8e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index effd2e4..ee6b1ff 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -23,15 +23,16 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
 
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import javax.xml.bind.DatatypeConverter;
@@ -56,7 +57,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.util.Tuple;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -79,11 +79,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private boolean poisoned = false;
     //used for tracking demarcated flowfiles to their TopicPartition so we can append
     //to them on subsequent poll calls
-    private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
     private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
     private long leaseStartNanos = -1;
     private boolean lastPollEmpty = false;
-    private int totalFlowFiles = 0;
+    private int totalMessages = 0;
 
     ConsumerLease(
             final long maxWaitMillis,
@@ -115,7 +115,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         uncommittedOffsetsMap.clear();
         leaseStartNanos = -1;
         lastPollEmpty = false;
-        totalFlowFiles = 0;
+        totalMessages = 0;
     }
 
     /**
@@ -206,6 +206,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             kafkaConsumer.commitSync(offsetsMap);
             resetInternalState();
             return true;
+        } catch (final IOException ioe) {
+            poison();
+            logger.error("Failed to finish writing out FlowFile bundle", ioe);
+            throw new ProcessException(ioe);
         } catch (final KafkaException kex) {
             poison();
             logger.warn("Duplicates are likely as we were able to commit the process"
@@ -253,7 +257,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
             return false;
         } else {
-            return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+            return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
         }
     }
 
@@ -315,12 +319,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                 } else if (readerFactory != null && writerFactory != null) {
                     writeRecordData(getProcessSession(), messages, partition);
                 } else {
-                    totalFlowFiles += messages.size();
                     messages.stream().forEach(message -> {
                         writeData(getProcessSession(), message, partition);
                     });
                 }
 
+                totalMessages += messages.size();
                 uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
             }
         });
@@ -340,15 +344,36 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         }
     }
 
-    private Collection<FlowFile> getBundles() {
+    private Collection<FlowFile> getBundles() throws IOException {
         final List<FlowFile> flowFiles = new ArrayList<>();
         for (final BundleTracker tracker : bundleMap.values()) {
-            populateAttributes(tracker);
+            processBundle(tracker);
             flowFiles.add(tracker.flowFile);
         }
         return flowFiles;
     }
 
+    private void processBundle(final BundleTracker bundle) throws IOException {
+        final RecordSetWriter writer = bundle.recordWriter;
+        if (writer != null) {
+            final WriteResult writeResult;
+
+            try {
+                writeResult = writer.finishRecordSet();
+            } finally {
+                writer.close();
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.putAll(writeResult.getAttributes());
+            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+            bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
+        }
+
+        populateAttributes(bundle);
+    }
+
     private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
         FlowFile flowFile = session.create();
         final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
@@ -364,7 +389,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
         final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
         final boolean demarcateFirstRecord;
-        BundleTracker tracker = bundleMap.get(topicPartition);
+        final BundleInformation bundleInfo = new BundleInformation(topicPartition, null);
+        BundleTracker tracker = bundleMap.get(bundleInfo);
         FlowFile flowFile;
         if (tracker == null) {
             tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
@@ -388,39 +414,22 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             }
         });
         tracker.updateFlowFile(flowFile);
-        bundleMap.put(topicPartition, tracker);
+        bundleMap.put(bundleInfo, tracker);
     }
 
-    private void rollback(final TopicPartition topicPartition) {
-        OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
-        if (offsetAndMetadata == null) {
-            offsetAndMetadata = kafkaConsumer.committed(topicPartition);
-        }
-
-        final long offset = offsetAndMetadata.offset();
-        kafkaConsumer.seek(topicPartition, offset);
-    }
 
     private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
-        if (records.isEmpty()) {
-            return;
-        }
-
-        final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
-
         // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
         // We don't want to create a new FlowFile for each record that we receive, so we will just create
         // a "temporary flowfile" that will be removed in the finally block below and use that to pass to
         // the createRecordReader method.
         final FlowFile tempFlowFile = session.create();
-        try {
-            final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
-            while (itr.hasNext()) {
-                final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
-                final InputStream in = new ByteArrayInputStream(consumerRecord.value());
+        RecordSetWriter writer = null;
 
+        try {
+            for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
                 final Record record;
-                try {
+                try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
                     final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
                     record = reader.nextRecord();
                 } catch (final Exception e) {
@@ -445,10 +454,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                     continue;
                 }
 
+                // Determine the bundle for this record.
                 final RecordSchema recordSchema = record.getSchema();
+                final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema);
 
-                Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema);
-                if (tuple == null) {
+                BundleTracker tracker = bundleMap.get(bundleInfo);
+                if (tracker == null) {
                     FlowFile flowFile = session.create();
                     final OutputStream rawOut = session.write(flowFile);
 
@@ -468,20 +479,33 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         throw new ProcessException(e);
                     }
 
-                    final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
+                    writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
                     writer.beginRecordSet();
 
-                    tuple = new Tuple<>(flowFile, writer);
-                    writers.put(recordSchema, tuple);
+                    tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+                    tracker.updateFlowFile(flowFile);
+                    bundleMap.put(bundleInfo, tracker);
+                } else {
+                    writer = tracker.recordWriter;
                 }
 
-                final RecordSetWriter writer = tuple.getValue();
                 writer.write(record);
+                tracker.incrementRecordCount(1L);
             }
+
+            session.adjustCounter("Records Received", records.size(), false);
         } catch (final Exception e) {
             logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
 
             try {
+                if (writer != null) {
+                    writer.close();
+                }
+            } catch (final Exception ioe) {
+                logger.warn("Failed to close Record Writer", ioe);
+            }
+
+            try {
                 rollback(topicPartition);
             } catch (final Exception rollbackException) {
                 logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
@@ -491,50 +515,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         } finally {
             session.remove(tempFlowFile);
         }
+    }
 
-        for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
-            FlowFile flowFile = tuple.getKey();
-            final RecordSetWriter writer = tuple.getValue();
-
-            final WriteResult writeResult;
-            try {
-                writeResult = writer.finishRecordSet();
-                writer.close();
-            } catch (final Exception e) {
-                logger.error("Failed to finish writing records to Content Repository", e);
-                try {
-                    rollback(topicPartition);
-                } catch (final Exception rollbackException) {
-                    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
-                }
-                throw new ProcessException(e);
-            }
-
-            final int recordCount = writeResult.getRecordCount();
-            if (recordCount > 0) {
-                final Map<String, String> attributes = new HashMap<>();
-                attributes.putAll(writeResult.getAttributes());
-                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
-                attributes.put("record.count", String.valueOf(recordCount));
-
-                attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
-                attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
-
-                flowFile = session.putAllAttributes(flowFile, attributes);
-
-                final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
-                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
-                session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
 
-                session.adjustCounter("Records Received", recordCount, false);
-                session.transfer(flowFile, REL_SUCCESS);
-            } else {
-                session.remove(flowFile);
-            }
+    private void rollback(final TopicPartition topicPartition) {
+        OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
+        if (offsetAndMetadata == null) {
+            offsetAndMetadata = kafkaConsumer.committed(topicPartition);
         }
+
+        final long offset = offsetAndMetadata.offset();
+        kafkaConsumer.seek(topicPartition, offset);
     }
 
 
+
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
@@ -544,7 +539,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
         if (tracker.totalRecords > 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+            // Add a record.count attribute to remain consistent with other record-oriented processors. If not
+            // reading/writing records, then use "kafka.count" attribute.
+            if (tracker.recordWriter == null) {
+                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+            } else {
+                kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
+            }
         }
         final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
         final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
@@ -559,13 +560,19 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         final int partition;
         final String topic;
         final String key;
+        final RecordSetWriter recordWriter;
         FlowFile flowFile;
         long totalRecords = 0;
 
         private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this(initialRecord, topicPartition, keyEncoding, null);
+        }
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
             this.initialOffset = initialRecord.offset();
             this.partition = topicPartition.partition();
             this.topic = topicPartition.topic();
+            this.recordWriter = recordWriter;
             this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
         }
 
@@ -579,4 +586,33 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
     }
 
+    private static class BundleInformation {
+        private final TopicPartition topicPartition;
+        private final RecordSchema schema;
+
+        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema) {
+            this.topicPartition = topicPartition;
+            this.schema = schema;
+        }
+
+        @Override
+        public int hashCode() {
+            return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode());
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof BundleInformation)) {
+                return false;
+            }
+            final BundleInformation other = (BundleInformation) obj;
+            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema);
+        }
+    }
 }