You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/09/23 02:56:37 UTC

[4/5] nifi git commit: NIFI-4201: This closes #2024. Implementation of processors for interacting with Kafka 0.11

http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
new file mode 100644
index 0000000..de19648
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+/**
+ * This class represents a lease to access a Kafka Consumer object. The lease is
+ * intended to be obtained from a ConsumerPool. The lease is closeable to allow
+ * for the clean model of a try w/resources whereby non-exceptional cases mean
+ * the lease will be returned to the pool for future use by others. A given
+ * lease may only belong to a single thread a time.
+ */
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+    private final long maxWaitMillis;
+    private final Consumer<byte[], byte[]> kafkaConsumer;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private final RecordSetWriterFactory writerFactory;
+    private final RecordReaderFactory readerFactory;
+    private final Charset headerCharacterSet;
+    private final Pattern headerNamePattern;
+    private boolean poisoned = false;
+    //used for tracking demarcated flowfiles to their TopicPartition so we can append
+    //to them on subsequent poll calls
+    private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+    private long leaseStartNanos = -1;
+    private boolean lastPollEmpty = false;
+    private int totalMessages = 0;
+
+    ConsumerLease(
+            final long maxWaitMillis,
+            final Consumer<byte[], byte[]> kafkaConsumer,
+            final byte[] demarcatorBytes,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final ComponentLog logger,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.maxWaitMillis = maxWaitMillis;
+        this.kafkaConsumer = kafkaConsumer;
+        this.demarcatorBytes = demarcatorBytes;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.logger = logger;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    /**
+     * clears out internal state elements excluding session and consumer as
+     * those are managed by the pool itself
+     */
+    private void resetInternalState() {
+        bundleMap.clear();
+        uncommittedOffsetsMap.clear();
+        leaseStartNanos = -1;
+        lastPollEmpty = false;
+        totalMessages = 0;
+    }
+
+    /**
+     * Kafka will call this method whenever it is about to rebalance the
+     * consumers for the given partitions. We'll simply take this to mean that
+     * we need to quickly commit what we've got and will return the consumer to
+     * the pool. This method will be called during the poll() method call of
+     * this class and will be called by the same thread calling poll according
+     * to the Kafka API docs. After this method executes the session and kafka
+     * offsets are committed and this lease is closed.
+     *
+     * @param partitions partitions being reassigned
+     */
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
+        commit();
+    }
+
+    /**
+     * This will be called by Kafka when the rebalance has completed. We don't
+     * need to do anything with this information other than optionally log it as
+     * by this point we've committed what we've got and moved on.
+     *
+     * @param partitions topic partition set being reassigned
+     */
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+    }
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer and creates any new
+     * flowfiles necessary or appends to existing ones if in demarcation mode.
+     */
+    void poll() {
+        /**
+         * Implementation note:
+         * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
+         * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
+         * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
+         * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
+         * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
+         * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
+         */
+        try {
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            lastPollEmpty = records.count() == 0;
+            processRecords(records);
+        } catch (final ProcessException pe) {
+            throw pe;
+        } catch (final Throwable t) {
+            this.poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Notifies Kafka to commit the offsets for the specified topic/partition
+     * pairs to the specified offsets w/the given metadata. This can offer
+     * higher performance than the other commitOffsets call as it allows the
+     * kafka client to collect more data from Kafka before committing the
+     * offsets.
+     *
+     * if false then we didn't do anything and should probably yield if true
+     * then we committed new data
+     *
+     */
+    boolean commit() {
+        if (uncommittedOffsetsMap.isEmpty()) {
+            resetInternalState();
+            return false;
+        }
+        try {
+            /**
+             * Committing the nifi session then the offsets means we have an at
+             * least once guarantee here. If we reversed the order we'd have at
+             * most once.
+             */
+            final Collection<FlowFile> bundledFlowFiles = getBundles();
+            if (!bundledFlowFiles.isEmpty()) {
+                getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+            }
+            getProcessSession().commit();
+
+            final Map<TopicPartition, OffsetAndMetadata> offsetsMap = uncommittedOffsetsMap;
+            kafkaConsumer.commitSync(offsetsMap);
+            resetInternalState();
+            return true;
+        } catch (final IOException ioe) {
+            poison();
+            logger.error("Failed to finish writing out FlowFile bundle", ioe);
+            throw new ProcessException(ioe);
+        } catch (final KafkaException kex) {
+            poison();
+            logger.warn("Duplicates are likely as we were able to commit the process"
+                    + " session but received an exception from Kafka while committing"
+                    + " offsets.");
+            throw kex;
+        } catch (final Throwable t) {
+            poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Indicates whether we should continue polling for data. If we are not
+     * writing data with a demarcator then we're writing individual flow files
+     * per kafka message therefore we must be very mindful of memory usage for
+     * the flow file objects (not their content) being held in memory. The
+     * content of kafka messages will be written to the content repository
+     * immediately upon each poll call but we must still be mindful of how much
+     * memory can be used in each poll call. We will indicate that we should
+     * stop polling our last poll call produced no new results or if we've
+     * polling and processing data longer than the specified maximum polling
+     * time or if we have reached out specified max flow file limit or if a
+     * rebalance has been initiated for one of the partitions we're watching;
+     * otherwise true.
+     *
+     * @return true if should keep polling; false otherwise
+     */
+    boolean continuePolling() {
+        //stop if the last poll produced new no data
+        if (lastPollEmpty) {
+            return false;
+        }
+
+        //stop if we've gone past our desired max uncommitted wait time
+        if (leaseStartNanos < 0) {
+            leaseStartNanos = System.nanoTime();
+        }
+        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        if (durationMillis > maxWaitMillis) {
+            return false;
+        }
+
+        //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+        if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+            return false;
+        } else {
+            return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
+        }
+    }
+
+    /**
+     * Indicates that the underlying session and consumer should be immediately
+     * considered invalid. Once closed the session will be rolled back and the
+     * pool should destroy the underlying consumer. This is useful if due to
+     * external reasons, such as the processor no longer being scheduled, this
+     * lease should be terminated immediately.
+     */
+    private void poison() {
+        poisoned = true;
+    }
+
+    /**
+     * @return true if this lease has been poisoned; false otherwise
+     */
+    boolean isPoisoned() {
+        return poisoned;
+    }
+
+    /**
+     * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
+     */
+    public void wakeup() {
+        kafkaConsumer.wakeup();
+    }
+
+    /**
+     * Abstract method that is intended to be extended by the pool that created
+     * this ConsumerLease object. It should ensure that the session given to
+     * create this session is rolled back and that the underlying kafka consumer
+     * is either returned to the pool for continued use or destroyed if this
+     * lease has been poisoned. It can only be called once. Calling it more than
+     * once can result in undefined and non threadsafe behavior.
+     */
+    @Override
+    public void close() {
+        resetInternalState();
+    }
+
+    public abstract ProcessSession getProcessSession();
+
+    public abstract void yield();
+
+    private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+        records.partitions().stream().forEach(partition -> {
+            List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+            if (!messages.isEmpty()) {
+                //update maximum offset map for this topic partition
+                long maxOffset = messages.stream()
+                        .mapToLong(record -> record.offset())
+                        .max()
+                        .getAsLong();
+
+                //write records to content repository and session
+                if (demarcatorBytes != null) {
+                    writeDemarcatedData(getProcessSession(), messages, partition);
+                } else if (readerFactory != null && writerFactory != null) {
+                    writeRecordData(getProcessSession(), messages, partition);
+                } else {
+                    messages.stream().forEach(message -> {
+                        writeData(getProcessSession(), message, partition);
+                    });
+                }
+
+                totalMessages += messages.size();
+                uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+            }
+        });
+    }
+
+    private static String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;  // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
+    private Collection<FlowFile> getBundles() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        for (final BundleTracker tracker : bundleMap.values()) {
+            final boolean includeBundle = processBundle(tracker);
+            if (includeBundle) {
+                flowFiles.add(tracker.flowFile);
+            }
+        }
+        return flowFiles;
+    }
+
+    private boolean processBundle(final BundleTracker bundle) throws IOException {
+        final RecordSetWriter writer = bundle.recordWriter;
+        if (writer != null) {
+            final WriteResult writeResult;
+
+            try {
+                writeResult = writer.finishRecordSet();
+            } finally {
+                writer.close();
+            }
+
+            if (writeResult.getRecordCount() == 0) {
+                getProcessSession().remove(bundle.flowFile);
+                return false;
+            }
+
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.putAll(writeResult.getAttributes());
+            attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+            bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
+        }
+
+        populateAttributes(bundle);
+        return true;
+    }
+
+    private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+        FlowFile flowFile = session.create();
+        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+        tracker.incrementRecordCount(1);
+        flowFile = session.write(flowFile, out -> {
+            out.write(record.value());
+        });
+        tracker.updateFlowFile(flowFile);
+        populateAttributes(tracker);
+        session.transfer(tracker.flowFile, REL_SUCCESS);
+    }
+
+    private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        // Group the Records by their BundleInformation
+        final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
+            .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec))));
+
+        for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
+            final BundleInformation bundleInfo = entry.getKey();
+            final List<ConsumerRecord<byte[], byte[]>> recordList = entry.getValue();
+
+            final boolean demarcateFirstRecord;
+
+            BundleTracker tracker = bundleMap.get(bundleInfo);
+
+            FlowFile flowFile;
+            if (tracker == null) {
+                tracker = new BundleTracker(recordList.get(0), topicPartition, keyEncoding);
+                flowFile = session.create();
+                flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes);
+                tracker.updateFlowFile(flowFile);
+                demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+            } else {
+                demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+            }
+            flowFile = tracker.flowFile;
+
+            tracker.incrementRecordCount(recordList.size());
+            flowFile = session.append(flowFile, out -> {
+                boolean useDemarcator = demarcateFirstRecord;
+                for (final ConsumerRecord<byte[], byte[]> record : recordList) {
+                    if (useDemarcator) {
+                        out.write(demarcatorBytes);
+                    }
+                    out.write(record.value());
+                    useDemarcator = true;
+                }
+            });
+
+            tracker.updateFlowFile(flowFile);
+            bundleMap.put(bundleInfo, tracker);
+        }
+    }
+
+    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
+        handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+            + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+    }
+
+    private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
+        // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+        final Map<String, String> attributes = getAttributes(consumerRecord);
+        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+
+        FlowFile failureFlowFile = session.create();
+
+        failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+        failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+        session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+        session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+        if (cause == null) {
+            logger.error(message);
+        } else {
+            logger.error(message, cause);
+        }
+
+        session.adjustCounter("Parse Failures", 1, false);
+    }
+
+    private Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) {
+        final Map<String, String> attributes = new HashMap<>();
+        if (headerNamePattern == null) {
+            return attributes;
+        }
+
+        for (final Header header : consumerRecord.headers()) {
+            final String attributeName = header.key();
+            if (headerNamePattern.matcher(attributeName).matches()) {
+                attributes.put(attributeName, new String(header.value(), headerCharacterSet));
+            }
+        }
+
+        return attributes;
+    }
+
+    private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
+        // We don't want to create a new FlowFile for each record that we receive, so we will just create
+        // a "temporary flowfile" that will be removed in the finally block below and use that to pass to
+        // the createRecordReader method.
+        RecordSetWriter writer = null;
+        try {
+            for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+                final Map<String, String> attributes = getAttributes(consumerRecord);
+
+                final Record record;
+                try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
+                    final RecordReader reader = readerFactory.createRecordReader(attributes, in, logger);
+                    record = reader.nextRecord();
+                } catch (final Exception e) {
+                    handleParseFailure(consumerRecord, session, e);
+                    continue;
+                }
+
+                if (record == null) {
+                    handleParseFailure(consumerRecord, session, null);
+                    continue;
+                }
+
+                // Determine the bundle for this record.
+                final RecordSchema recordSchema = record.getSchema();
+                final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+                BundleTracker tracker = bundleMap.get(bundleInfo);
+                if (tracker == null) {
+                    FlowFile flowFile = session.create();
+                    flowFile = session.putAllAttributes(flowFile, attributes);
+
+                    final OutputStream rawOut = session.write(flowFile);
+
+                    final RecordSchema writeSchema;
+                    try {
+                        writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+                    } catch (final Exception e) {
+                        logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+                        try {
+                            rollback(topicPartition);
+                        } catch (final Exception rollbackException) {
+                            logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+                        }
+
+                        yield();
+                        throw new ProcessException(e);
+                    }
+
+                    writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+                    writer.beginRecordSet();
+
+                    tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+                    tracker.updateFlowFile(flowFile);
+                    bundleMap.put(bundleInfo, tracker);
+                } else {
+                    writer = tracker.recordWriter;
+                }
+
+                try {
+                    writer.write(record);
+                } catch (final RuntimeException re) {
+                    handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+                        + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+                    continue;
+                }
+
+                tracker.incrementRecordCount(1L);
+            }
+
+            session.adjustCounter("Records Received", records.size(), false);
+        } catch (final Exception e) {
+            logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
+
+            try {
+                if (writer != null) {
+                    writer.close();
+                }
+            } catch (final Exception ioe) {
+                logger.warn("Failed to close Record Writer", ioe);
+            }
+
+            try {
+                rollback(topicPartition);
+            } catch (final Exception rollbackException) {
+                logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+            }
+
+            throw new ProcessException(e);
+        }
+    }
+
+
+    private void rollback(final TopicPartition topicPartition) {
+        OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
+        if (offsetAndMetadata == null) {
+            offsetAndMetadata = kafkaConsumer.committed(topicPartition);
+        }
+
+        final long offset = offsetAndMetadata.offset();
+        kafkaConsumer.seek(topicPartition, offset);
+    }
+
+
+
+    private void populateAttributes(final BundleTracker tracker) {
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        if (tracker.key != null && tracker.totalRecords == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        if (tracker.totalRecords > 1) {
+            // Add a record.count attribute to remain consistent with other record-oriented processors. If not
+            // reading/writing records, then use "kafka.count" attribute.
+            if (tracker.recordWriter == null) {
+                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+            } else {
+                kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
+            }
+        }
+        final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+        tracker.updateFlowFile(newFlowFile);
+    }
+
+    private static class BundleTracker {
+
+        final long initialOffset;
+        final int partition;
+        final String topic;
+        final String key;
+        final RecordSetWriter recordWriter;
+        FlowFile flowFile;
+        long totalRecords = 0;
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this(initialRecord, topicPartition, keyEncoding, null);
+        }
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
+            this.initialOffset = initialRecord.offset();
+            this.partition = topicPartition.partition();
+            this.topic = topicPartition.topic();
+            this.recordWriter = recordWriter;
+            this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+        }
+
+        private void incrementRecordCount(final long count) {
+            totalRecords += count;
+        }
+
+        private void updateFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+    }
+
+    private static class BundleInformation {
+        private final TopicPartition topicPartition;
+        private final RecordSchema schema;
+        private final Map<String, String> attributes;
+
+        public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) {
+            this.topicPartition = topicPartition;
+            this.schema = schema;
+            this.attributes = attributes;
+        }
+
+        @Override
+        public int hashCode() {
+            return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode());
+        }
+
+        @Override
+        public boolean equals(final Object obj) {
+            if (obj == this) {
+                return true;
+            }
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof BundleInformation)) {
+                return false;
+            }
+
+            final BundleInformation other = (BundleInformation) obj;
+            return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
new file mode 100644
index 0000000..a7bd96d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+/**
+ * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
+ * calling 'obtainConsumer'. Once closed the pool is ready to be immediately
+ * used again.
+ */
+public class ConsumerPool implements Closeable {
+
+    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
+    private final List<String> topics;
+    private final Pattern topicPattern;
+    private final Map<String, Object> kafkaProperties;
+    private final long maxWaitMillis;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private final boolean honorTransactions;
+    private final RecordReaderFactory readerFactory;
+    private final RecordSetWriterFactory writerFactory;
+    private final Charset headerCharacterSet;
+    private final Pattern headerNamePattern;
+    private final AtomicLong consumerCreatedCountRef = new AtomicLong();
+    private final AtomicLong consumerClosedCountRef = new AtomicLong();
+    private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+
+    /**
+     * Creates a pool of KafkaConsumer objects that will grow up to the maximum
+     * indicated threads from the given context. Consumers are lazily
+     * initialized. We may elect to not create up to the maximum number of
+     * configured consumers if the broker reported lag time for all topics is
+     * below a certain threshold.
+     *
+     * @param maxConcurrentLeases max allowable consumers at once
+     * @param demarcator bytes to use as demarcator between messages; null or
+     * empty means no demarcator
+     * @param kafkaProperties properties to use to initialize kafka consumers
+     * @param topics the topics to subscribe to
+     * @param maxWaitMillis maximum time to wait for a given lease to acquire
+     * data before committing
+     * @param keyEncoding the encoding to use for the key of a kafka message if
+     * found
+     * @param securityProtocol the security protocol used
+     * @param bootstrapServers the bootstrap servers
+     * @param logger the logger to report any errors/warnings
+     */
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, Object> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = Collections.unmodifiableList(topics);
+        this.topicPattern = null;
+        this.readerFactory = null;
+        this.writerFactory = null;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, Object> kafkaProperties,
+            final Pattern topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = null;
+        this.topicPattern = topics;
+        this.readerFactory = null;
+        this.writerFactory = null;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Map<String, Object> kafkaProperties,
+            final Pattern topics,
+            final long maxWaitMillis,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = null;
+        this.keyEncoding = null;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = null;
+        this.topicPattern = topics;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final RecordReaderFactory readerFactory,
+            final RecordSetWriterFactory writerFactory,
+            final Map<String, Object> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger,
+            final boolean honorTransactions,
+            final Charset headerCharacterSet,
+            final Pattern headerNamePattern) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = null;
+        this.keyEncoding = null;
+        this.readerFactory = readerFactory;
+        this.writerFactory = writerFactory;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = topics;
+        this.topicPattern = null;
+        this.honorTransactions = honorTransactions;
+        this.headerCharacterSet = headerCharacterSet;
+        this.headerNamePattern = headerNamePattern;
+    }
+
+    /**
+     * Obtains a consumer from the pool if one is available or lazily
+     * initializes a new one if deemed necessary.
+     *
+     * @param session the session for which the consumer lease will be
+     *            associated
+     * @param processContext the ProcessContext for which the consumer
+     *            lease will be associated
+     * @return consumer to use or null if not available or necessary
+     */
+    public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
+        SimpleConsumerLease lease = pooledLeases.poll();
+        if (lease == null) {
+            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+            consumerCreatedCountRef.incrementAndGet();
+            /**
+             * For now return a new consumer lease. But we could later elect to
+             * have this return null if we determine the broker indicates that
+             * the lag time on all topics being monitored is sufficiently low.
+             * For now we should encourage conservative use of threads because
+             * having too many means we'll have at best useless threads sitting
+             * around doing frequent network calls and at worst having consumers
+             * sitting idle which could prompt excessive rebalances.
+             */
+            lease = new SimpleConsumerLease(consumer);
+            /**
+             * This subscription tightly couples the lease to the given
+             * consumer. They cannot be separated from then on.
+             */
+            if (topics != null) {
+              consumer.subscribe(topics, lease);
+            } else {
+              consumer.subscribe(topicPattern, lease);
+            }
+        }
+        lease.setProcessSession(session, processContext);
+
+        leasesObtainedCountRef.incrementAndGet();
+        return lease;
+    }
+
+    /**
+     * Exposed as protected method for easier unit testing
+     *
+     * @return consumer
+     * @throws KafkaException if unable to subscribe to the given topics
+     */
+    protected Consumer<byte[], byte[]> createKafkaConsumer() {
+        final Map<String, Object> properties = new HashMap<>(kafkaProperties);
+        if (honorTransactions) {
+            properties.put("isolation.level", "read_committed");
+        } else {
+            properties.put("isolation.level", "read_uncommitted");
+        }
+        final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
+        return consumer;
+    }
+
+    /**
+     * Closes all consumers in the pool. Can be safely called repeatedly.
+     */
+    @Override
+    public void close() {
+        final List<SimpleConsumerLease> leases = new ArrayList<>();
+        pooledLeases.drainTo(leases);
+        leases.stream().forEach((lease) -> {
+            lease.close(true);
+        });
+    }
+
+    private void closeConsumer(final Consumer<?, ?> consumer) {
+        consumerClosedCountRef.incrementAndGet();
+        try {
+            consumer.unsubscribe();
+        } catch (Exception e) {
+            logger.warn("Failed while unsubscribing " + consumer, e);
+        }
+
+        try {
+            consumer.close();
+        } catch (Exception e) {
+            logger.warn("Failed while closing " + consumer, e);
+        }
+    }
+
+    PoolStats getPoolStats() {
+        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+    }
+
+    private class SimpleConsumerLease extends ConsumerLease {
+
+        private final Consumer<byte[], byte[]> consumer;
+        private volatile ProcessSession session;
+        private volatile ProcessContext processContext;
+        private volatile boolean closedConsumer;
+
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
+                readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern);
+            this.consumer = consumer;
+        }
+
+        void setProcessSession(final ProcessSession session, final ProcessContext context) {
+            this.session = session;
+            this.processContext = context;
+        }
+
+        @Override
+        public void yield() {
+            if (processContext != null) {
+                processContext.yield();
+            }
+        }
+
+        @Override
+        public ProcessSession getProcessSession() {
+            return session;
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            close(false);
+        }
+
+        public void close(final boolean forceClose) {
+            if (closedConsumer) {
+                return;
+            }
+            super.close();
+            if (session != null) {
+                session.rollback();
+                setProcessSession(null, null);
+            }
+            if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+                closedConsumer = true;
+                closeConsumer(consumer);
+            }
+        }
+    }
+
+    static final class PoolStats {
+
+        final long consumerCreatedCount;
+        final long consumerClosedCount;
+        final long leasesObtainedCount;
+
+        PoolStats(
+                final long consumerCreatedCount,
+                final long consumerClosedCount,
+                final long leasesObtainedCount
+        ) {
+            this.consumerCreatedCount = consumerCreatedCount;
+            this.consumerClosedCount = consumerClosedCount;
+            this.leasesObtainedCount = leasesObtainedCount;
+        }
+
+        @Override
+        public String toString() {
+            return "Created Consumers [" + consumerCreatedCount + "]\n"
+                    + "Closed Consumers  [" + consumerClosedCount + "]\n"
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n";
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
new file mode 100644
index 0000000..317b274
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+
+public class InFlightMessageTracker {
+    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
+    private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
+    private final Object progressMutex = new Object();
+    private final ComponentLog logger;
+
+    public InFlightMessageTracker(final ComponentLog logger) {
+        this.logger = logger;
+    }
+
+    public void incrementAcknowledgedCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+        counter.incrementAcknowledgedCount();
+
+        synchronized (progressMutex) {
+            progressMutex.notify();
+        }
+    }
+
+    public void trackEmpty(final FlowFile flowFile) {
+        messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
+    }
+
+    public int getAcknowledgedCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.get(flowFile);
+        return (counter == null) ? 0 : counter.getAcknowledgedCount();
+    }
+
+    public void incrementSentCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+        counter.incrementSentCount();
+    }
+
+    public int getSentCount(final FlowFile flowFile) {
+        final Counts counter = messageCountsByFlowFile.get(flowFile);
+        return (counter == null) ? 0 : counter.getSentCount();
+    }
+
+    public void fail(final FlowFile flowFile, final Exception exception) {
+        failures.putIfAbsent(flowFile, exception);
+        logger.error("Failed to send " + flowFile + " to Kafka", exception);
+
+        synchronized (progressMutex) {
+            progressMutex.notify();
+        }
+    }
+
+    public Exception getFailure(final FlowFile flowFile) {
+        return failures.get(flowFile);
+    }
+
+    public boolean isFailed(final FlowFile flowFile) {
+        return getFailure(flowFile) != null;
+    }
+
+    public void reset() {
+        messageCountsByFlowFile.clear();
+        failures.clear();
+    }
+
+    public PublishResult failOutstanding(final Exception exception) {
+        messageCountsByFlowFile.keySet().stream()
+            .filter(ff -> !isComplete(ff))
+            .filter(ff -> !failures.containsKey(ff))
+            .forEach(ff -> failures.put(ff, exception));
+
+        return createPublishResult();
+    }
+
+    private boolean isComplete(final FlowFile flowFile) {
+        final Counts counts = messageCountsByFlowFile.get(flowFile);
+        if (counts.getAcknowledgedCount() == counts.getSentCount()) {
+            // all messages received successfully.
+            return true;
+        }
+
+        if (failures.containsKey(flowFile)) {
+            // FlowFile failed so is complete
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean isComplete() {
+        return messageCountsByFlowFile.keySet().stream()
+            .allMatch(flowFile -> isComplete(flowFile));
+    }
+
+    void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
+        final long startTime = System.nanoTime();
+        final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
+
+        while (System.nanoTime() < maxTime) {
+            synchronized (progressMutex) {
+                if (isComplete()) {
+                    return;
+                }
+
+                progressMutex.wait(millis);
+            }
+        }
+
+        throw new TimeoutException();
+    }
+
+
+    PublishResult createPublishResult() {
+        return new PublishResult() {
+            @Override
+            public boolean isFailure() {
+                return !failures.isEmpty();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(final FlowFile flowFile) {
+                return getAcknowledgedCount(flowFile);
+            }
+
+            @Override
+            public Exception getReasonForFailure(final FlowFile flowFile) {
+                return getFailure(flowFile);
+            }
+        };
+    }
+
+    public static class Counts {
+        private final AtomicInteger sentCount = new AtomicInteger(0);
+        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
+
+        public void incrementSentCount() {
+            sentCount.incrementAndGet();
+        }
+
+        public void incrementAcknowledgedCount() {
+            acknowledgedCount.incrementAndGet();
+        }
+
+        public int getAcknowledgedCount() {
+            return acknowledgedCount.get();
+        }
+
+        public int getSentCount() {
+            return sentCount.get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
new file mode 100644
index 0000000..de28995
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class KafkaProcessorUtils {
+
+    final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+            "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
+    static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+
+    static final String KAFKA_KEY = "kafka.key";
+    static final String KAFKA_TOPIC = "kafka.topic";
+    static final String KAFKA_PARTITION = "kafka.partition";
+    static final String KAFKA_OFFSET = "kafka.offset";
+    static final String KAFKA_COUNT = "kafka.count";
+    static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
+    static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+    static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
+
+    static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
+            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
+            .displayName("Kafka Brokers")
+            .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+            .required(true)
+            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("localhost:9092")
+            .build();
+    static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+            .name("security.protocol")
+            .displayName("Security Protocol")
+            .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
+            .required(true)
+            .expressionLanguageSupported(false)
+            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
+            .defaultValue(SEC_PLAINTEXT.getValue())
+            .build();
+    static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder()
+            .name("sasl.kerberos.service.name")
+            .displayName("Kerberos Service Name")
+            .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. "
+                    + "Corresponds to Kafka's 'security.protocol' property."
+                    + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
+            .name("sasl.kerberos.principal")
+            .displayName("Kerberos Principal")
+            .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
+            .name("sasl.kerberos.keytab")
+            .displayName("Kerberos Keytab")
+            .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .build();
+    static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("ssl.context.service")
+            .displayName("SSL Context Service")
+            .description("Specifies the SSL Context Service to use for communicating with Kafka.")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    static List<PropertyDescriptor> getCommonPropertyDescriptors() {
+        return Arrays.asList(
+                BOOTSTRAP_SERVERS,
+                SECURITY_PROTOCOL,
+                KERBEROS_PRINCIPLE,
+                USER_PRINCIPAL,
+                USER_KEYTAB,
+                SSL_CONTEXT_SERVICE
+        );
+    }
+
+    static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) {
+        List<ValidationResult> results = new ArrayList<>();
+
+        String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+
+        /*
+         * validates that if one of SASL (Kerberos) option is selected for
+         * security protocol, then Kerberos principal is provided as well
+         */
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+            String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+            if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
+                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
+                                + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
+                                + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
+                        .build());
+            }
+
+            String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue();
+            String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue();
+            if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
+                    || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
+                results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+                        .explanation("Both <" + USER_KEYTAB.getDisplayName()  + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+                                + "must be set.")
+                        .build());
+            }
+        }
+
+        //If SSL or SASL_SSL then CS must be set.
+        final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
+        final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
+        if (csSet && !sslProtocol) {
+            results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
+                    .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build());
+        }
+        if (!csSet && sslProtocol) {
+            results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
+                    .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build());
+        }
+
+        final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
+        if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
+            results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+                    .explanation("Enable auto commit must be false.  It is managed by the processor.").build());
+        }
+
+        final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
+            results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+        }
+
+        final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
+            results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
+                    .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
+        }
+
+        final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
+            results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
+        }
+
+        final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+        if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
+            results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
+                    .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
+        }
+
+        return results;
+    }
+
+    static final class KafkaConfigValidator implements Validator {
+
+        final Class<?> classType;
+
+        public KafkaConfigValidator(final Class<?> classType) {
+            this.classType = classType;
+        }
+
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
+            return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
+        }
+    };
+
+    /**
+     * Builds transit URI for provenance event. The transit URI will be in the
+     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
+     */
+    static String buildTransitURI(String securityProtocol, String brokers, String topic) {
+        StringBuilder builder = new StringBuilder();
+        builder.append(securityProtocol);
+        builder.append("://");
+        builder.append(brokers);
+        builder.append("/");
+        builder.append(topic);
+        return builder.toString();
+    }
+
+
+    static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
+        for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
+            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
+                // Translate SSLContext Service configuration into Kafka properties
+                final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+                if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
+                    final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
+                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
+                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
+                }
+
+                if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
+                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
+                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
+                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
+                }
+            }
+
+            String propertyName = propertyDescriptor.getName();
+            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
+                    ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
+                    : context.getProperty(propertyDescriptor).getValue();
+
+            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
+                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
+                // or the standard NiFi time period such as "5 secs"
+                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
+                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
+                }
+
+                if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+                    mapToPopulate.put(propertyName, propertyValue);
+                }
+            }
+        }
+
+        String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+            setJaasConfig(mapToPopulate, context);
+        }
+    }
+
+    /**
+     * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
+     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
+     * <br />
+     * It expects something with the following format: <br />
+     * <br />
+     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; *(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
+     * ControlFlag = required / requisite / sufficient / optional
+     *
+     * @param mapToPopulate Map of configuration properties
+     * @param context Context
+     */
+    private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
+        String keytab = context.getProperty(USER_KEYTAB).getValue();
+        String principal = context.getProperty(USER_PRINCIPAL).getValue();
+        String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue();
+        if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
+            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+                    + "useTicketCache=false "
+                    + "renewTicket=true "
+                    + "serviceName=\"" + serviceName + "\" "
+                    + "useKeyTab=true "
+                    + "keyTab=\"" + keytab + "\" "
+                    + "principal=\"" + principal + "\";");
+        }
+    }
+
+    private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
+        return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
+    }
+
+    private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
+        final Set<String> strings = new HashSet<>();
+        for (final Class<?> classType : classes) {
+            for (final Field field : classType.getDeclaredFields()) {
+                if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
+                    try {
+                        strings.add(String.valueOf(field.get(null)));
+                    } catch (IllegalArgumentException | IllegalAccessException ex) {
+                        //ignore
+                    }
+                }
+            }
+        }
+        return strings;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
new file mode 100644
index 0000000..64ab4ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+    private Partitioners() {
+    }
+
+    /**
+     * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+     * distributes load between all available partitions.
+     */
+    public static class RoundRobinPartitioner implements Partitioner {
+
+        private volatile int index;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // noop
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+            return this.next(cluster.availablePartitionsForTopic(topic).size());
+        }
+
+        @Override
+        public void close() {
+            // noop
+        }
+
+        private synchronized int next(int numberOfPartitions) {
+            if (this.index >= numberOfPartitions) {
+                this.index = 0;
+            }
+            return index++;
+        }
+    }
+}