You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/09/06 19:49:10 UTC

[1/3] nifi git commit: NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call

Repository: nifi
Updated Branches:
  refs/heads/master 088125451 -> 7a451935a


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 3f20b8f..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.io.Closeable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessSession;
 
 /**
  * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class ConsumerPool implements Closeable {
 
-    private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
-    private final int maxLeases;
-    private final Queue<ConsumerLease> consumerLeases;
+    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
     private final List<String> topics;
     private final Map<String, Object> kafkaProperties;
+    private final long maxWaitMillis;
     private final ComponentLog logger;
-
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
-    private final AtomicLong productivePollCountRef = new AtomicLong();
-    private final AtomicLong unproductivePollCountRef = new AtomicLong();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
-     * indicated leases. Consumers are lazily initialized.
+     * indicated threads from the given context. Consumers are lazily
+     * initialized. We may elect to not create up to the maximum number of
+     * configured consumers if the broker reported lag time for all topics is
+     * below a certain threshold.
      *
-     * @param maxLeases maximum number of active leases in the pool
-     * @param topics the topics to consume from
-     * @param kafkaProperties the properties for each consumer
+     * @param maxConcurrentLeases max allowable consumers at once
+     * @param demarcator bytes to use as demarcator between messages; null or
+     * empty means no demarcator
+     * @param kafkaProperties properties to use to initialize kafka consumers
+     * @param topics the topics to subscribe to
+     * @param maxWaitMillis maximum time to wait for a given lease to acquire
+     * data before committing
+     * @param keyEncoding the encoding to use for the key of a kafka message if
+     * found
+     * @param securityProtocol the security protocol used
+     * @param bootstrapServers the bootstrap servers
      * @param logger the logger to report any errors/warnings
      */
-    public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
-        this.maxLeases = maxLeases;
-        if (maxLeases <= 0) {
-            throw new IllegalArgumentException("Max leases value must be greather than zero.");
-        }
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, String> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
-        if (topics == null || topics.isEmpty()) {
-            throw new IllegalArgumentException("Must have a list of one or more topics");
-        }
-        this.topics = topics;
-        this.kafkaProperties = new HashMap<>(kafkaProperties);
-        this.consumerLeases = new ArrayDeque<>();
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = Collections.unmodifiableList(topics);
     }
 
     /**
-     * Obtains a consumer from the pool if one is available
+     * Obtains a consumer from the pool if one is available or lazily
+     * initializes a new one if deemed necessary.
      *
-     * @return consumer from the pool
-     * @throws IllegalArgumentException if pool already contains
+     * @param session the session for which the consumer lease will be
+     * associated
+     * @return consumer to use or null if not available or necessary
      */
-    public ConsumerLease obtainConsumer() {
-        final ConsumerLease lease;
-        final int activeLeases;
-        synchronized (this) {
-            lease = consumerLeases.poll();
-            activeLeases = activeLeaseCount.get();
-        }
-        if (lease == null && activeLeases >= maxLeases) {
-            logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
-            return null;
+    public ConsumerLease obtainConsumer(final ProcessSession session) {
+        SimpleConsumerLease lease = pooledLeases.poll();
+        if (lease == null) {
+            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+            consumerCreatedCountRef.incrementAndGet();
+            /**
+             * For now return a new consumer lease. But we could later elect to
+             * have this return null if we determine the broker indicates that
+             * the lag time on all topics being monitored is sufficiently low.
+             * For now we should encourage conservative use of threads because
+             * having too many means we'll have at best useless threads sitting
+             * around doing frequent network calls and at worst having consumers
+             * sitting idle which could prompt excessive rebalances.
+             */
+            lease = new SimpleConsumerLease(consumer);
+            /**
+             * This subscription tightly couples the lease to the given
+             * consumer. They cannot be separated from then on.
+             */
+            consumer.subscribe(topics, lease);
         }
+        lease.setProcessSession(session);
         leasesObtainedCountRef.incrementAndGet();
-        return (lease == null) ? createConsumer() : lease;
+        return lease;
     }
 
+    /**
+     * Exposed as protected method for easier unit testing
+     *
+     * @return consumer
+     * @throws KafkaException if unable to subscribe to the given topics
+     */
     protected Consumer<byte[], byte[]> createKafkaConsumer() {
         return new KafkaConsumer<>(kafkaProperties);
     }
 
-    private ConsumerLease createConsumer() {
-        final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
-        consumerCreatedCountRef.incrementAndGet();
-        try {
-            kafkaConsumer.subscribe(topics);
-        } catch (final KafkaException kex) {
-            try {
-                kafkaConsumer.close();
-                consumerClosedCountRef.incrementAndGet();
-            } catch (final Exception ex) {
-                consumerClosedCountRef.incrementAndGet();
-                //ignore
-            }
-            throw kex;
-        }
-
-        final ConsumerLease lease = new ConsumerLease() {
-
-            private volatile boolean poisoned = false;
-            private volatile boolean closed = false;
-
-            @Override
-            public ConsumerRecords<byte[], byte[]> poll() {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and should no longer be used");
-                }
-
-                try {
-                    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
-                    if (records.isEmpty()) {
-                        unproductivePollCountRef.incrementAndGet();
-                    } else {
-                        productivePollCountRef.incrementAndGet();
-                    }
-                    return records;
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and should no longer be used");
-                }
-                try {
-                    kafkaConsumer.commitSync(offsets);
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void close() {
-                if (closed) {
-                    return;
-                }
-                if (poisoned || activeLeaseCount.get() > maxLeases) {
-                    closeConsumer(kafkaConsumer);
-                    activeLeaseCount.decrementAndGet();
-                    closed = true;
-                } else {
-                    final boolean added;
-                    synchronized (ConsumerPool.this) {
-                        added = consumerLeases.offer(this);
-                    }
-                    if (!added) {
-                        closeConsumer(kafkaConsumer);
-                        activeLeaseCount.decrementAndGet();
-                    }
-                }
-            }
-
-            @Override
-            public void poison() {
-                poisoned = true;
-            }
-        };
-        activeLeaseCount.incrementAndGet();
-        return lease;
-    }
-
     /**
-     * Closes all consumers in the pool. Can be safely recalled.
+     * Closes all consumers in the pool. Can be safely called repeatedly.
      */
     @Override
     public void close() {
-        final List<ConsumerLease> leases = new ArrayList<>();
-        synchronized (this) {
-            ConsumerLease lease = null;
-            while ((lease = consumerLeases.poll()) != null) {
-                leases.add(lease);
-            }
-        }
-        for (final ConsumerLease lease : leases) {
-            lease.poison();
-            lease.close();
-        }
+        final List<SimpleConsumerLease> leases = new ArrayList<>();
+        pooledLeases.drainTo(leases);
+        leases.stream().forEach((lease) -> {
+            lease.close(true);
+        });
     }
 
     private void closeConsumer(final Consumer consumer) {
+        consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();
         } catch (Exception e) {
@@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
 
         try {
             consumer.close();
-            consumerClosedCountRef.incrementAndGet();
         } catch (Exception e) {
-            consumerClosedCountRef.incrementAndGet();
             logger.warn("Failed while closing " + consumer, e);
         }
     }
 
     PoolStats getPoolStats() {
-        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
+        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+    }
+
+    private class SimpleConsumerLease extends ConsumerLease {
+
+        private final Consumer<byte[], byte[]> consumer;
+        private volatile ProcessSession session;
+        private volatile boolean closedConsumer;
+
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
+            this.consumer = consumer;
+        }
+
+        void setProcessSession(final ProcessSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public ProcessSession getProcessSession() {
+            return session;
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            close(false);
+        }
+
+        public void close(final boolean forceClose) {
+            if (closedConsumer) {
+                return;
+            }
+            super.close();
+            if (session != null) {
+                session.rollback();
+                setProcessSession(null);
+            }
+            if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+                closedConsumer = true;
+                closeConsumer(consumer);
+            }
+        }
     }
 
     static final class PoolStats {
@@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
         final long consumerCreatedCount;
         final long consumerClosedCount;
         final long leasesObtainedCount;
-        final long productivePollCount;
-        final long unproductivePollCount;
 
         PoolStats(
                 final long consumerCreatedCount,
                 final long consumerClosedCount,
-                final long leasesObtainedCount,
-                final long productivePollCount,
-                final long unproductivePollCount
+                final long leasesObtainedCount
         ) {
             this.consumerCreatedCount = consumerCreatedCount;
             this.consumerClosedCount = consumerClosedCount;
             this.leasesObtainedCount = leasesObtainedCount;
-            this.productivePollCount = productivePollCount;
-            this.unproductivePollCount = unproductivePollCount;
         }
 
         @Override
         public String toString() {
             return "Created Consumers [" + consumerCreatedCount + "]\n"
                     + "Closed Consumers  [" + consumerClosedCount + "]\n"
-                    + "Leases Obtained   [" + leasesObtainedCount + "]\n"
-                    + "Productive Polls  [" + productivePollCount + "]\n"
-                    + "Unproductive Polls  [" + unproductivePollCount + "]\n";
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n";
         }
 
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index c74ad18..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
 
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+            "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
     static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
 
     static final String KAFKA_KEY = "kafka.key";
@@ -96,7 +100,6 @@ final class KafkaProcessorUtils {
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(false)
             .build();
-
     static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("ssl.context.service")
             .displayName("SSL Context Service")
@@ -227,7 +230,6 @@ final class KafkaProcessorUtils {
                     mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
                 }
             }
-
             String pName = propertyDescriptor.getName();
             String pValue = propertyDescriptor.isExpressionLanguageSupported()
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7e4b12c..8e3fa3b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,105 +16,36 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class ConsumeKafkaTest {
 
-    static class MockConsumerPool extends ConsumerPool {
-
-        final int actualMaxLeases;
-        final List<String> actualTopics;
-        final Map<String, String> actualKafkaProperties;
-        boolean throwKafkaExceptionOnPoll = false;
-        boolean throwKafkaExceptionOnCommit = false;
-        Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
-        Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
-        Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
-        boolean wasConsumerLeasePoisoned = false;
-        boolean wasConsumerLeaseClosed = false;
-        boolean wasPoolClosed = false;
-
-        public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
-            super(maxLeases, topics, kafkaProperties, null);
-            actualMaxLeases = maxLeases;
-            actualTopics = topics;
-            actualKafkaProperties = kafkaProperties;
-        }
-
-        @Override
-        public ConsumerLease obtainConsumer() {
-            return new ConsumerLease() {
-                @Override
-                public ConsumerRecords<byte[], byte[]> poll() {
-                    if (throwKafkaExceptionOnPoll) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
-                    return (records == null) ? ConsumerRecords.empty() : records;
-                }
-
-                @Override
-                public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
-                    if (throwKafkaExceptionOnCommit) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    actualCommitOffsets = offsets;
-                }
-
-                @Override
-                public void poison() {
-                    wasConsumerLeasePoisoned = true;
-                }
-
-                @Override
-                public void close() {
-                    wasConsumerLeaseClosed = true;
-                }
-            };
-        }
-
-        @Override
-        public void close() {
-            wasPoolClosed = true;
-        }
-
-        void resetState() {
-            throwKafkaExceptionOnPoll = false;
-            throwKafkaExceptionOnCommit = false;
-            nextPlannedRecordsQueue = null;
-            nextExpectedCommitOffsets = null;
-            wasConsumerLeasePoisoned = false;
-            wasConsumerLeaseClosed = false;
-            wasPoolClosed = false;
-        }
+    Consumer<byte[], byte[]> mockConsumer = null;
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
 
+    @Before
+    public void setup() {
+        mockConsumer = mock(Consumer.class);
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
     }
 
     @Test
@@ -175,365 +106,45 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessages() throws Exception {
         String groupName = "validateGetAllMessages";
 
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-        } else {
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        }
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-    }
-
-    @Test
-    public void validateGetLotsOfMessages() throws Exception {
-        String groupName = "validateGetLotsOfMessages";
-
-        final byte[][] firstPassValues = new byte[10010][1];
-        for (final byte[] value : firstPassValues) {
-            value[0] = 0x12;
-        }
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
-        assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
-        assertEquals(1, mockPool.actualCommitOffsets.size());
-        assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-        for (final byte[] rawRecord : rawRecords) {
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-
-        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
-            final byte[] key = entry.getKey();
-            final byte[] rawRecord = entry.getValue();
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        for (final ConsumerRecords<byte[], byte[]> rec : records) {
-            rec.partitions().stream().forEach((part) -> {
-                final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
-                if (map.get(part) != null) {
-                    throw new IllegalStateException("already have that topic/partition in the record map");
-                }
-                map.put(part, conRecs);
-            });
-        }
-        return new ConsumerRecords<>(map);
-    }
-
-    @Test
-    public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
-        String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues),
-                createConsumerRecords("bar", 1, 1L, secondPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
         ConsumeKafka proc = new ConsumeKafka() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
-
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(2, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertEquals(2, mockPool.actualCommitOffsets.size());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-    }
-
-    @Test
-    public void validatePollException() throws Exception {
-        String groupName = "validatePollException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnPoll = true;
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(0, flowFiles.size());
-        assertNull(null, mockPool.actualCommitOffsets);
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateCommitOffsetException() throws Exception {
-        String groupName = "validateCommitOffsetException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnCommit = true;
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(1, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertNull(null, mockPool.actualCommitOffsets);
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
     @Test
-    public void validateUtf8Key() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
 
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
         ConsumeKafka proc = new ConsumeKafka() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -542,89 +153,15 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
-    @Test
-    public void validateHexKey() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerPoolTest {
 
     Consumer<byte[], byte[]> consumer = null;
+    ProcessSession mockSession = null;
+    ProvenanceReporter mockReporter = null;
+    ConsumerPool testPool = null;
+    ConsumerPool testDemarcatedPool = null;
     ComponentLog logger = null;
 
     @Before
     public void setup() {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
-    }
-
-    @Test
-    public void validatePoolSimpleCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
             }
         };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
 
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
             lease.poll();
-            lease.commitOffsets(Collections.emptyMap());
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
-        assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(1, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
+        assertEquals(4, stats.leasesObtainedCount);
     }
 
     @Test
-    public void validatePoolSimpleBatchCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
 
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
-            try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
                 for (int j = 0; j < 100; j++) {
                     lease.poll();
                 }
-                lease.commitOffsets(Collections.emptyMap());
             }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(100, stats.leasesObtainedCount);
-        assertEquals(10000, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
 
     @Test
-    public void validatePoolConsumerFails() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
             lease.poll();
-            fail();
-        } catch (final KafkaException ke) {
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
 
+            }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(0, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
 }


[3/3] nifi git commit: NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call

Posted by bb...@apache.org.
NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call

This closes #987.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a451935
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a451935
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a451935

Branch: refs/heads/master
Commit: 7a451935a583d4d7fd4eccf47a869552c2f8dda7
Parents: 0881254
Author: joewitt <jo...@apache.org>
Authored: Wed Aug 31 15:25:12 2016 +1000
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Sep 6 15:48:40 2016 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafka_0_10.java         | 265 ++-------
 .../processors/kafka/pubsub/ConsumerLease.java  | 359 +++++++++++-
 .../processors/kafka/pubsub/ConsumerPool.java   | 285 +++++-----
 .../kafka/pubsub/KafkaProcessorUtils.java       |   4 +
 .../kafka/pubsub/PublishKafka_0_10.java         |   6 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          | 548 ++----------------
 .../kafka/pubsub/ConsumerPoolTest.java          | 172 ++++--
 .../kafka/pubsub/PublishKafkaTest.java          |   2 +-
 .../processors/kafka/pubsub/ConsumeKafka.java   | 268 ++-------
 .../processors/kafka/pubsub/ConsumerLease.java  | 359 +++++++++++-
 .../processors/kafka/pubsub/ConsumerPool.java   | 285 +++++-----
 .../kafka/pubsub/KafkaProcessorUtils.java       |   6 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          | 555 ++-----------------
 .../kafka/pubsub/ConsumerPoolTest.java          | 172 ++++--
 14 files changed, 1413 insertions(+), 1873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 847f8a4..e799876 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -25,13 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -53,7 +46,8 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10 Consumer API. "
         + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
@@ -63,7 +57,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
 @WritesAttributes({
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
-        + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+            + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
 public class ConsumeKafka_0_10 extends AbstractProcessor {
 
-    private static final long FIVE_MB = 5L * 1024L * 1024L;
-
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
@@ -136,6 +124,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
                     + "will result in a single FlowFile which  "
                     + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
             .build();
+
     static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
             .name("max.poll.records")
             .displayName("Max Poll Records")
@@ -145,6 +134,20 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+            .name("max-uncommit-offset-wait")
+            .displayName("Max Uncommitted Time")
+            .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+                    + "This value impacts how often offsets will be committed.  Committing offsets less often increases "
+                    + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+                    + "or JVM restart between commits.  This value is also related to maximum poll records and the use "
+                    + "of a message demarcator.  When using a message demarcator we can have far more uncommitted messages "
+                    + "than when we're not as there is much less for us to keep track of in memory.")
+            .required(false)
+            .defaultValue("1 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,7 +156,6 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
     static final List<PropertyDescriptor> DESCRIPTORS;
     static final Set<Relationship> RELATIONSHIPS;
 
-    private volatile byte[] demarcatorBytes = null;
     private volatile ConsumerPool consumerPool = null;
 
     static {
@@ -165,6 +167,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
         descriptors.add(MAX_POLL_RECORDS);
+        descriptors.add(MAX_UNCOMMITTED_TIME);
         DESCRIPTORS = Collections.unmodifiableList(descriptors);
         RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
     }
@@ -179,16 +182,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         return DESCRIPTORS;
     }
 
-    @OnScheduled
-    public void prepareProcessing(final ProcessContext context) {
-        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
-                : null;
-    }
-
     @OnStopped
     public void close() {
-        demarcatorBytes = null;
         final ConsumerPool pool = consumerPool;
         consumerPool = null;
         if (pool != null) {
@@ -215,9 +210,21 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
             return pool;
         }
 
+        return consumerPool = createConsumerPool(context, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+        final int maxLeases = context.getMaxConcurrentTasks();
+        final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
+                ? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
         final Map<String, String> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
-        final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         for (final String topic : topicListing.split(",", 100)) {
             final String trimmedName = topic.trim();
@@ -225,212 +232,40 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
                 topics.add(trimmedName);
             }
         }
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
-    }
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
 
-    protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-        return new ConsumerPool(maxLeases, topics, props, log);
+        return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final long startTimeNanos = System.nanoTime();
         final ConsumerPool pool = getConsumerPool(context);
         if (pool == null) {
             context.yield();
             return;
         }
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
-        try (final ConsumerLease lease = pool.obtainConsumer()) {
-            try {
-                if (lease == null) {
-                    context.yield();
-                    return;
-                }
 
-                final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
-                if (!foundData) {
-                    session.rollback();
-                    return;
-                }
-
-                writeSessionData(context, session, partitionRecordMap, startTimeNanos);
-                //At-least once commit handling (if order is reversed it is at-most once)
-                session.commit();
-                commitOffsets(lease, partitionRecordMap);
-            } catch (final KafkaException ke) {
-                lease.poison();
-                getLogger().error("Problem while accessing kafka consumer " + ke, ke);
+        try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+            if (lease == null) {
                 context.yield();
-                session.rollback();
+                return;
             }
-        }
-    }
-
-    private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
-        final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
-        partitionRecordMap.entrySet().stream()
-                .filter(entry -> !entry.getValue().isEmpty())
-                .forEach((entry) -> {
-                    long maxOffset = entry.getValue().stream()
-                            .mapToLong(record -> record.offset())
-                            .max()
-                            .getAsLong();
-                    partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
-                });
-        lease.commitOffsets(partOffsetMap);
-    }
-
-    private void writeSessionData(
-            final ProcessContext context, final ProcessSession session,
-            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
-            final long startTimeNanos) {
-        if (demarcatorBytes != null) {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .forEach(entry -> {
-                        writeData(context, session, entry.getValue(), startTimeNanos);
-                    });
-        } else {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .flatMap(entry -> entry.getValue().stream())
-                    .forEach(record -> {
-                        writeData(context, session, Collections.singletonList(record), startTimeNanos);
-                    });
-        }
-    }
-
-    private String encodeKafkaKey(final byte[] key, final String encoding) {
-        if (key == null) {
-            return null;
-        }
-
-        if (HEX_ENCODING.getValue().equals(encoding)) {
-            return DatatypeConverter.printHexBinary(key);
-        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
-            return new String(key, StandardCharsets.UTF_8);
-        } else {
-            return null;    // won't happen because it is guaranteed by the Allowable Values
-        }
-    }
-
-    private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
-        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
-        final String offset = String.valueOf(firstRecord.offset());
-        final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
-        final String topic = firstRecord.topic();
-        final String partition = String.valueOf(firstRecord.partition());
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, out -> {
-            boolean useDemarcator = false;
-            for (final ConsumerRecord<byte[], byte[]> record : records) {
-                if (useDemarcator) {
-                    out.write(demarcatorBytes);
-                }
-                out.write(record.value());
-                useDemarcator = true;
-            }
-        });
-        final Map<String, String> kafkaAttrs = new HashMap<>();
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
-        if (keyValue != null && records.size() == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
-        }
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
-        if (records.size() > 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
-        }
-        flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
-        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(
-                context.getProperty(SECURITY_PROTOCOL).getValue(),
-                context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
-                topic);
-        session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
-        this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
-                new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
-        session.transfer(flowFile, REL_SUCCESS);
-    }
-
-    /**
-     * Populates the given partitionRecordMap with new records until we poll
-     * that returns no records or until we have enough data. It is important to
-     * ensure we keep items grouped by their topic and partition so that when we
-     * bundle them we bundle them intelligently and so that we can set offsets
-     * properly even across multiple poll calls.
-     */
-    private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
-        final long startNanos = System.nanoTime();
-        boolean foundData = false;
-        ConsumerRecords<byte[], byte[]> records;
-        final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
-        do {
-            records = lease.poll();
-
-            for (final TopicPartition partition : records.partitions()) {
-                List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
-                if (currList == null) {
-                    currList = new ArrayList<>();
-                    partitionRecordMap.put(partition, currList);
+            try {
+                while (this.isScheduled() && lease.continuePolling()) {
+                    lease.poll();
                 }
-                currList.addAll(records.records(partition));
-                if (currList.size() > 0) {
-                    foundData = true;
+                if (this.isScheduled() && !lease.commit()) {
+                    context.yield();
                 }
-            }
-            //If we received data and we still want to get more
-        } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
-        return foundData;
-    }
-
-    /**
-     * Determines if we have enough data as-is and should move on.
-     *
-     * @return true if we've been gathering for more than 500 ms or if we're
-     * demarcating and have more than 50 flowfiles worth or if we're per message
-     * and have more than 2000 flowfiles or if totalMessageSize is greater than
-     * two megabytes; false otherwise
-     *
-     * Implementation note: 500 millis and 5 MB are magic numbers. These may
-     * need to be tuned. They get at how often offsets will get committed to
-     * kafka relative to how many records will get buffered into memory in a
-     * poll call before writing to repos.
-     */
-    private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final int maxRecords, final long startTimeNanos) {
-
-        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
-        if (durationMillis > 500) {
-            return true;
-        }
-
-        int topicPartitionsFilled = 0;
-        int totalRecords = 0;
-        long totalRecordSize = 0;
-
-        for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
-            if (!recordList.isEmpty()) {
-                topicPartitionsFilled++;
-            }
-            totalRecords += recordList.size();
-            for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
-                totalRecordSize += rec.value().length;
+            } catch (final KafkaException kex) {
+                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+                        new Object[]{lease, kex}, kex);
+            } catch (final Throwable t) {
+                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+                        new Object[]{lease, t}, t);
             }
         }
-
-        if (demarcatorBytes != null && demarcatorBytes.length > 0) {
-            return topicPartitionsFilled > 50;
-        } else if (totalRecordSize > FIVE_MB) {
-            return true;
-        } else {
-            return totalRecords > maxRecords;
-        }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..f7a2e57 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,27 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
  * the lease will be returned to the pool for future use by others. A given
  * lease may only belong to a single thread a time.
  */
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+    private final long maxWaitMillis;
+    private final Consumer<byte[], byte[]> kafkaConsumer;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private boolean poisoned = false;
+    //used for tracking demarcated flowfiles to their TopicPartition so we can append
+    //to them on subsequent poll calls
+    private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+    private long leaseStartNanos = -1;
+    private boolean lastPollEmpty = false;
+    private int totalFlowFiles = 0;
+
+    ConsumerLease(
+            final long maxWaitMillis,
+            final Consumer<byte[], byte[]> kafkaConsumer,
+            final byte[] demarcatorBytes,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.maxWaitMillis = maxWaitMillis;
+        this.kafkaConsumer = kafkaConsumer;
+        this.demarcatorBytes = demarcatorBytes;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.logger = logger;
+    }
+
+    /**
+     * clears out internal state elements excluding session and consumer as
+     * those are managed by the pool itself
+     */
+    private void resetInternalState() {
+        bundleMap.clear();
+        uncommittedOffsetsMap.clear();
+        leaseStartNanos = -1;
+        lastPollEmpty = false;
+        totalFlowFiles = 0;
+    }
+
+    /**
+     * Kafka will call this method whenever it is about to rebalance the
+     * consumers for the given partitions. We'll simply take this to mean that
+     * we need to quickly commit what we've got and will return the consumer to
+     * the pool. This method will be called during the poll() method call of
+     * this class and will be called by the same thread calling poll according
+     * to the Kafka API docs. After this method executes the session and kafka
+     * offsets are committed and this lease is closed.
+     *
+     * @param partitions partitions being reassigned
+     */
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
+        commit();
+    }
 
     /**
-     * Executes a poll on the underlying Kafka Consumer.
+     * This will be called by Kafka when the rebalance has completed. We don't
+     * need to do anything with this information other than optionally log it as
+     * by this point we've committed what we've got and moved on.
      *
-     * @return ConsumerRecords retrieved in the poll.
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * @param partitions topic partition set being reassigned
      */
-    ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+    }
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer and creates any new
+     * flowfiles necessary or appends to existing ones if in demarcation mode.
+     */
+    void poll() {
+        /**
+         * Implementation note: If we take too long (30 secs?) between kafka
+         * poll calls and our own record processing to any subsequent poll calls
+         * or the commit we can run into a situation where the commit will
+         * succeed to the session but fail on committing offsets. This is
+         * apparently different than the Kafka scenario of electing to rebalance
+         * for other reasons but in this case is due a session timeout. It
+         * appears Kafka KIP-62 aims to offer more control over the meaning of
+         * various timeouts. If we do run into this case it could result in
+         * duplicates.
+         */
+        try {
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            lastPollEmpty = records.count() == 0;
+            processRecords(records);
+        } catch (final Throwable t) {
+            this.poison();
+            throw t;
+        }
+    }
 
     /**
      * Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
      * kafka client to collect more data from Kafka before committing the
      * offsets.
      *
-     * @param offsets offsets
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * if false then we didn't do anything and should probably yield if true
+     * then we committed new data
+     *
+     */
+    boolean commit() {
+        if (uncommittedOffsetsMap.isEmpty()) {
+            resetInternalState();
+            return false;
+        }
+        try {
+            /**
+             * Committing the nifi session then the offsets means we have an at
+             * least once guarantee here. If we reversed the order we'd have at
+             * most once.
+             */
+            final Collection<FlowFile> bundledFlowFiles = getBundles();
+            if (!bundledFlowFiles.isEmpty()) {
+                getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+            }
+            getProcessSession().commit();
+            kafkaConsumer.commitSync(uncommittedOffsetsMap);
+            resetInternalState();
+            return true;
+        } catch (final KafkaException kex) {
+            poison();
+            logger.warn("Duplicates are likely as we were able to commit the process"
+                    + " session but received an exception from Kafka while committing"
+                    + " offsets.");
+            throw kex;
+        } catch (final Throwable t) {
+            poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Indicates whether we should continue polling for data. If we are not
+     * writing data with a demarcator then we're writing individual flow files
+     * per kafka message therefore we must be very mindful of memory usage for
+     * the flow file objects (not their content) being held in memory. The
+     * content of kafka messages will be written to the content repository
+     * immediately upon each poll call but we must still be mindful of how much
+     * memory can be used in each poll call. We will indicate that we should
+     * stop polling our last poll call produced no new results or if we've
+     * polling and processing data longer than the specified maximum polling
+     * time or if we have reached out specified max flow file limit or if a
+     * rebalance has been initiated for one of the partitions we're watching;
+     * otherwise true.
+     *
+     * @return true if should keep polling; false otherwise
+     */
+    boolean continuePolling() {
+        //stop if the last poll produced new no data
+        if (lastPollEmpty) {
+            return false;
+        }
+
+        //stop if we've gone past our desired max uncommitted wait time
+        if (leaseStartNanos < 0) {
+            leaseStartNanos = System.nanoTime();
+        }
+        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        if (durationMillis > maxWaitMillis) {
+            return false;
+        }
+
+        //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+        if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+            return false;
+        } else {
+            return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+        }
+    }
+
+    /**
+     * Indicates that the underlying session and consumer should be immediately
+     * considered invalid. Once closed the session will be rolled back and the
+     * pool should destroy the underlying consumer. This is useful if due to
+     * external reasons, such as the processor no longer being scheduled, this
+     * lease should be terminated immediately.
      */
-    void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+    private void poison() {
+        poisoned = true;
+    }
 
     /**
-     * Notifies that this lease is poisoned and should not be reused.
+     * @return true if this lease has been poisoned; false otherwise
      */
-    void poison();
+    boolean isPoisoned() {
+        return poisoned;
+    }
 
     /**
-     * Notifies that this lease is to be returned. The pool may optionally reuse
-     * this lease with another client. No further references by the caller
-     * should occur after calling close.
+     * Abstract method that is intended to be extended by the pool that created
+     * this ConsumerLease object. It should ensure that the session given to
+     * create this session is rolled back and that the underlying kafka consumer
+     * is either returned to the pool for continued use or destroyed if this
+     * lease has been poisoned. It can only be called once. Calling it more than
+     * once can result in undefined and non threadsafe behavior.
      */
     @Override
-    void close();
+    public void close() {
+        resetInternalState();
+    }
+
+    public abstract ProcessSession getProcessSession();
+
+    private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+        records.partitions().stream().forEach(partition -> {
+            List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+            if (!messages.isEmpty()) {
+                //update maximum offset map for this topic partition
+                long maxOffset = messages.stream()
+                        .mapToLong(record -> record.offset())
+                        .max()
+                        .getAsLong();
+                uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+                //write records to content repository and session
+                if (demarcatorBytes == null) {
+                    totalFlowFiles += messages.size();
+                    messages.stream().forEach(message -> {
+                        writeData(getProcessSession(), message, partition);
+                    });
+                } else {
+                    writeData(getProcessSession(), messages, partition);
+                }
+            }
+        });
+    }
+
+    private static String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;  // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
+    private Collection<FlowFile> getBundles() {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        for (final BundleTracker tracker : bundleMap.values()) {
+            populateAttributes(tracker);
+            flowFiles.add(tracker.flowFile);
+        }
+        return flowFiles;
+    }
+
+    private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+        FlowFile flowFile = session.create();
+        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+        tracker.incrementRecordCount(1);
+        flowFile = session.write(flowFile, out -> {
+            out.write(record.value());
+        });
+        tracker.updateFlowFile(flowFile);
+        populateAttributes(tracker);
+        session.transfer(tracker.flowFile, REL_SUCCESS);
+    }
+
+    private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+        final boolean demarcateFirstRecord;
+        BundleTracker tracker = bundleMap.get(topicPartition);
+        FlowFile flowFile;
+        if (tracker == null) {
+            tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+            flowFile = session.create();
+            tracker.updateFlowFile(flowFile);
+            demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+        } else {
+            demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+        }
+        flowFile = tracker.flowFile;
+        tracker.incrementRecordCount(records.size());
+        flowFile = session.append(flowFile, out -> {
+            boolean useDemarcator = demarcateFirstRecord;
+            for (final ConsumerRecord<byte[], byte[]> record : records) {
+                if (useDemarcator) {
+                    out.write(demarcatorBytes);
+                }
+                out.write(record.value());
+                useDemarcator = true;
+            }
+        });
+        tracker.updateFlowFile(flowFile);
+        bundleMap.put(topicPartition, tracker);
+    }
+
+    private void populateAttributes(final BundleTracker tracker) {
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        if (tracker.key != null && tracker.totalRecords == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        if (tracker.totalRecords > 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+        }
+        final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+        tracker.updateFlowFile(newFlowFile);
+    }
+
+    private static class BundleTracker {
+
+        final long initialOffset;
+        final int partition;
+        final String topic;
+        final String key;
+        FlowFile flowFile;
+        long totalRecords = 0;
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this.initialOffset = initialRecord.offset();
+            this.partition = topicPartition.partition();
+            this.topic = topicPartition.topic();
+            this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+        }
+
+        private void incrementRecordCount(final long count) {
+            totalRecords += count;
+        }
+
+        private void updateFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 3f20b8f..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.io.Closeable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessSession;
 
 /**
  * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class ConsumerPool implements Closeable {
 
-    private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
-    private final int maxLeases;
-    private final Queue<ConsumerLease> consumerLeases;
+    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
     private final List<String> topics;
     private final Map<String, Object> kafkaProperties;
+    private final long maxWaitMillis;
     private final ComponentLog logger;
-
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
-    private final AtomicLong productivePollCountRef = new AtomicLong();
-    private final AtomicLong unproductivePollCountRef = new AtomicLong();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
-     * indicated leases. Consumers are lazily initialized.
+     * indicated threads from the given context. Consumers are lazily
+     * initialized. We may elect to not create up to the maximum number of
+     * configured consumers if the broker reported lag time for all topics is
+     * below a certain threshold.
      *
-     * @param maxLeases maximum number of active leases in the pool
-     * @param topics the topics to consume from
-     * @param kafkaProperties the properties for each consumer
+     * @param maxConcurrentLeases max allowable consumers at once
+     * @param demarcator bytes to use as demarcator between messages; null or
+     * empty means no demarcator
+     * @param kafkaProperties properties to use to initialize kafka consumers
+     * @param topics the topics to subscribe to
+     * @param maxWaitMillis maximum time to wait for a given lease to acquire
+     * data before committing
+     * @param keyEncoding the encoding to use for the key of a kafka message if
+     * found
+     * @param securityProtocol the security protocol used
+     * @param bootstrapServers the bootstrap servers
      * @param logger the logger to report any errors/warnings
      */
-    public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
-        this.maxLeases = maxLeases;
-        if (maxLeases <= 0) {
-            throw new IllegalArgumentException("Max leases value must be greather than zero.");
-        }
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, String> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
-        if (topics == null || topics.isEmpty()) {
-            throw new IllegalArgumentException("Must have a list of one or more topics");
-        }
-        this.topics = topics;
-        this.kafkaProperties = new HashMap<>(kafkaProperties);
-        this.consumerLeases = new ArrayDeque<>();
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = Collections.unmodifiableList(topics);
     }
 
     /**
-     * Obtains a consumer from the pool if one is available
+     * Obtains a consumer from the pool if one is available or lazily
+     * initializes a new one if deemed necessary.
      *
-     * @return consumer from the pool
-     * @throws IllegalArgumentException if pool already contains
+     * @param session the session for which the consumer lease will be
+     * associated
+     * @return consumer to use or null if not available or necessary
      */
-    public ConsumerLease obtainConsumer() {
-        final ConsumerLease lease;
-        final int activeLeases;
-        synchronized (this) {
-            lease = consumerLeases.poll();
-            activeLeases = activeLeaseCount.get();
-        }
-        if (lease == null && activeLeases >= maxLeases) {
-            logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
-            return null;
+    public ConsumerLease obtainConsumer(final ProcessSession session) {
+        SimpleConsumerLease lease = pooledLeases.poll();
+        if (lease == null) {
+            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+            consumerCreatedCountRef.incrementAndGet();
+            /**
+             * For now return a new consumer lease. But we could later elect to
+             * have this return null if we determine the broker indicates that
+             * the lag time on all topics being monitored is sufficiently low.
+             * For now we should encourage conservative use of threads because
+             * having too many means we'll have at best useless threads sitting
+             * around doing frequent network calls and at worst having consumers
+             * sitting idle which could prompt excessive rebalances.
+             */
+            lease = new SimpleConsumerLease(consumer);
+            /**
+             * This subscription tightly couples the lease to the given
+             * consumer. They cannot be separated from then on.
+             */
+            consumer.subscribe(topics, lease);
         }
+        lease.setProcessSession(session);
         leasesObtainedCountRef.incrementAndGet();
-        return (lease == null) ? createConsumer() : lease;
+        return lease;
     }
 
+    /**
+     * Exposed as protected method for easier unit testing
+     *
+     * @return consumer
+     * @throws KafkaException if unable to subscribe to the given topics
+     */
     protected Consumer<byte[], byte[]> createKafkaConsumer() {
         return new KafkaConsumer<>(kafkaProperties);
     }
 
-    private ConsumerLease createConsumer() {
-        final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
-        consumerCreatedCountRef.incrementAndGet();
-        try {
-            kafkaConsumer.subscribe(topics);
-        } catch (final KafkaException kex) {
-            try {
-                kafkaConsumer.close();
-                consumerClosedCountRef.incrementAndGet();
-            } catch (final Exception ex) {
-                consumerClosedCountRef.incrementAndGet();
-                //ignore
-            }
-            throw kex;
-        }
-
-        final ConsumerLease lease = new ConsumerLease() {
-
-            private volatile boolean poisoned = false;
-            private volatile boolean closed = false;
-
-            @Override
-            public ConsumerRecords<byte[], byte[]> poll() {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and should no longer be used");
-                }
-
-                try {
-                    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
-                    if (records.isEmpty()) {
-                        unproductivePollCountRef.incrementAndGet();
-                    } else {
-                        productivePollCountRef.incrementAndGet();
-                    }
-                    return records;
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and should no longer be used");
-                }
-                try {
-                    kafkaConsumer.commitSync(offsets);
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void close() {
-                if (closed) {
-                    return;
-                }
-                if (poisoned || activeLeaseCount.get() > maxLeases) {
-                    closeConsumer(kafkaConsumer);
-                    activeLeaseCount.decrementAndGet();
-                    closed = true;
-                } else {
-                    final boolean added;
-                    synchronized (ConsumerPool.this) {
-                        added = consumerLeases.offer(this);
-                    }
-                    if (!added) {
-                        closeConsumer(kafkaConsumer);
-                        activeLeaseCount.decrementAndGet();
-                    }
-                }
-            }
-
-            @Override
-            public void poison() {
-                poisoned = true;
-            }
-        };
-        activeLeaseCount.incrementAndGet();
-        return lease;
-    }
-
     /**
-     * Closes all consumers in the pool. Can be safely recalled.
+     * Closes all consumers in the pool. Can be safely called repeatedly.
      */
     @Override
     public void close() {
-        final List<ConsumerLease> leases = new ArrayList<>();
-        synchronized (this) {
-            ConsumerLease lease = null;
-            while ((lease = consumerLeases.poll()) != null) {
-                leases.add(lease);
-            }
-        }
-        for (final ConsumerLease lease : leases) {
-            lease.poison();
-            lease.close();
-        }
+        final List<SimpleConsumerLease> leases = new ArrayList<>();
+        pooledLeases.drainTo(leases);
+        leases.stream().forEach((lease) -> {
+            lease.close(true);
+        });
     }
 
     private void closeConsumer(final Consumer consumer) {
+        consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();
         } catch (Exception e) {
@@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
 
         try {
             consumer.close();
-            consumerClosedCountRef.incrementAndGet();
         } catch (Exception e) {
-            consumerClosedCountRef.incrementAndGet();
             logger.warn("Failed while closing " + consumer, e);
         }
     }
 
     PoolStats getPoolStats() {
-        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
+        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+    }
+
+    private class SimpleConsumerLease extends ConsumerLease {
+
+        private final Consumer<byte[], byte[]> consumer;
+        private volatile ProcessSession session;
+        private volatile boolean closedConsumer;
+
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
+            this.consumer = consumer;
+        }
+
+        void setProcessSession(final ProcessSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public ProcessSession getProcessSession() {
+            return session;
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            close(false);
+        }
+
+        public void close(final boolean forceClose) {
+            if (closedConsumer) {
+                return;
+            }
+            super.close();
+            if (session != null) {
+                session.rollback();
+                setProcessSession(null);
+            }
+            if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+                closedConsumer = true;
+                closeConsumer(consumer);
+            }
+        }
     }
 
     static final class PoolStats {
@@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
         final long consumerCreatedCount;
         final long consumerClosedCount;
         final long leasesObtainedCount;
-        final long productivePollCount;
-        final long unproductivePollCount;
 
         PoolStats(
                 final long consumerCreatedCount,
                 final long consumerClosedCount,
-                final long leasesObtainedCount,
-                final long productivePollCount,
-                final long unproductivePollCount
+                final long leasesObtainedCount
         ) {
             this.consumerCreatedCount = consumerCreatedCount;
             this.consumerClosedCount = consumerClosedCount;
             this.leasesObtainedCount = leasesObtainedCount;
-            this.productivePollCount = productivePollCount;
-            this.unproductivePollCount = unproductivePollCount;
         }
 
         @Override
         public String toString() {
             return "Created Consumers [" + consumerCreatedCount + "]\n"
                     + "Closed Consumers  [" + consumerClosedCount + "]\n"
-                    + "Leases Obtained   [" + leasesObtainedCount + "]\n"
-                    + "Productive Polls  [" + productivePollCount + "]\n"
-                    + "Unproductive Polls  [" + unproductivePollCount + "]\n";
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n";
         }
 
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 3ae7495..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
 
     private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
 
+    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+            "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
     static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
 
     static final String KAFKA_KEY = "kafka.key";

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
index 18f3018..83688b7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -55,6 +55,8 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -104,10 +106,6 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
     static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
             "DefaultPartitioner", "Messages will be assigned to random partitions.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name")


[2/3] nifi git commit: NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call

Posted by bb...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index a85563d..6fd9053 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,104 +16,36 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class ConsumeKafkaTest {
 
-    static class MockConsumerPool extends ConsumerPool {
-
-        final int actualMaxLeases;
-        final List<String> actualTopics;
-        final Map<String, String> actualKafkaProperties;
-        boolean throwKafkaExceptionOnPoll = false;
-        boolean throwKafkaExceptionOnCommit = false;
-        Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
-        Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
-        Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
-        boolean wasConsumerLeasePoisoned = false;
-        boolean wasConsumerLeaseClosed = false;
-        boolean wasPoolClosed = false;
-
-        public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
-            super(maxLeases, topics, kafkaProperties, null);
-            actualMaxLeases = maxLeases;
-            actualTopics = topics;
-            actualKafkaProperties = kafkaProperties;
-        }
-
-        @Override
-        public ConsumerLease obtainConsumer() {
-            return new ConsumerLease() {
-                @Override
-                public ConsumerRecords<byte[], byte[]> poll() {
-                    if (throwKafkaExceptionOnPoll) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
-                    return (records == null) ? ConsumerRecords.empty() : records;
-                }
-
-                @Override
-                public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
-                    if (throwKafkaExceptionOnCommit) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    actualCommitOffsets = offsets;
-                }
-
-                @Override
-                public void poison() {
-                    wasConsumerLeasePoisoned = true;
-                }
-
-                @Override
-                public void close() {
-                    wasConsumerLeaseClosed = true;
-                }
-            };
-        }
-
-        @Override
-        public void close() {
-            wasPoolClosed = true;
-        }
-
-        void resetState() {
-            throwKafkaExceptionOnPoll = false;
-            throwKafkaExceptionOnCommit = false;
-            nextPlannedRecordsQueue = null;
-            nextExpectedCommitOffsets = null;
-            wasConsumerLeasePoisoned = false;
-            wasConsumerLeaseClosed = false;
-            wasPoolClosed = false;
-        }
+    Consumer<byte[], byte[]> mockConsumer = null;
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
 
+    @Before
+    public void setup() {
+        mockConsumer = mock(Consumer.class);
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
     }
 
     @Test
@@ -174,31 +106,14 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessages() throws Exception {
         String groupName = "validateGetAllMessages";
 
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
         ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -207,69 +122,29 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-        } else {
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        }
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
     @Test
-    public void validateGetLotsOfMessages() throws Exception {
-        String groupName = "validateGetLotsOfMessages";
-
-        final byte[][] firstPassValues = new byte[10010][1];
-        for (final byte[] value : firstPassValues) {
-            value[0] = 0x12;
-        }
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
 
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
         ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -278,352 +153,15 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
-        assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
-        assertEquals(1, mockPool.actualCommitOffsets.size());
-        assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-        for (final byte[] rawRecord : rawRecords) {
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-
-        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
-            final byte[] key = entry.getKey();
-            final byte[] rawRecord = entry.getValue();
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        for (final ConsumerRecords<byte[], byte[]> rec : records) {
-            rec.partitions().stream().forEach((part) -> {
-                final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
-                if (map.get(part) != null) {
-                    throw new IllegalStateException("already have that topic/partition in the record map");
-                }
-                map.put(part, conRecs);
-            });
-        }
-        return new ConsumerRecords<>(map);
-    }
-
-    @Test
-    public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
-        String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues),
-                createConsumerRecords("bar", 1, 1L, secondPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(2, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertEquals(2, mockPool.actualCommitOffsets.size());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-    }
-
-    @Test
-    public void validatePollException() throws Exception {
-        String groupName = "validatePollException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnPoll = true;
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(0, flowFiles.size());
-        assertNull(null, mockPool.actualCommitOffsets);
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateCommitOffsetException() throws Exception {
-        String groupName = "validateCommitOffsetException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnCommit = true;
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(1, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertNull(null, mockPool.actualCommitOffsets);
-    }
-
-    @Test
-    public void validateUtf8Key() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateHexKey() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerPoolTest {
 
     Consumer<byte[], byte[]> consumer = null;
+    ProcessSession mockSession = null;
+    ProvenanceReporter mockReporter = null;
+    ConsumerPool testPool = null;
+    ConsumerPool testDemarcatedPool = null;
     ComponentLog logger = null;
 
     @Before
     public void setup() {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
-    }
-
-    @Test
-    public void validatePoolSimpleCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
             }
         };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
 
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
             lease.poll();
-            lease.commitOffsets(Collections.emptyMap());
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
-        assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(1, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
+        assertEquals(4, stats.leasesObtainedCount);
     }
 
     @Test
-    public void validatePoolSimpleBatchCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
 
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
-            try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
                 for (int j = 0; j < 100; j++) {
                     lease.poll();
                 }
-                lease.commitOffsets(Collections.emptyMap());
             }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(100, stats.leasesObtainedCount);
-        assertEquals(10000, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
 
     @Test
-    public void validatePoolConsumerFails() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
             lease.poll();
-            fail();
-        } catch (final KafkaException ke) {
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
 
+            }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(0, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index af0d343..b3f1bd1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -358,7 +358,7 @@ public class PublishKafkaTest {
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
+        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
         runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
 
         final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 0a3fe5d..0b8a752 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -25,13 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -53,17 +46,18 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API. "
         + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
         + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the mean time"
         + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
-@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9"})
 @WritesAttributes({
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
-        + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+            + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
 public class ConsumeKafka extends AbstractProcessor {
 
-    private static final long TWO_MB = 2L * 1024L * 1024L;
-
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
@@ -136,6 +124,7 @@ public class ConsumeKafka extends AbstractProcessor {
                     + "will result in a single FlowFile which  "
                     + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
             .build();
+
     static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
             .name("max.poll.records")
             .displayName("Max Poll Records")
@@ -145,6 +134,20 @@ public class ConsumeKafka extends AbstractProcessor {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+            .name("max-uncommit-offset-wait")
+            .displayName("Max Uncommitted Time")
+            .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+                    + "This value impacts how often offsets will be committed.  Committing offsets less often increases "
+                    + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+                    + "or JVM restart between commits.  This value is also related to maximum poll records and the use "
+                    + "of a message demarcator.  When using a message demarcator we can have far more uncommitted messages "
+                    + "than when we're not as there is much less for us to keep track of in memory.")
+            .required(false)
+            .defaultValue("1 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,7 +156,6 @@ public class ConsumeKafka extends AbstractProcessor {
     static final List<PropertyDescriptor> DESCRIPTORS;
     static final Set<Relationship> RELATIONSHIPS;
 
-    private volatile byte[] demarcatorBytes = null;
     private volatile ConsumerPool consumerPool = null;
 
     static {
@@ -165,6 +167,7 @@ public class ConsumeKafka extends AbstractProcessor {
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
         descriptors.add(MAX_POLL_RECORDS);
+        descriptors.add(MAX_UNCOMMITTED_TIME);
         DESCRIPTORS = Collections.unmodifiableList(descriptors);
         RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
     }
@@ -179,16 +182,8 @@ public class ConsumeKafka extends AbstractProcessor {
         return DESCRIPTORS;
     }
 
-    @OnScheduled
-    public void prepareProcessing(final ProcessContext context) {
-        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
-                : null;
-    }
-
     @OnStopped
     public void close() {
-        demarcatorBytes = null;
         final ConsumerPool pool = consumerPool;
         consumerPool = null;
         if (pool != null) {
@@ -215,9 +210,21 @@ public class ConsumeKafka extends AbstractProcessor {
             return pool;
         }
 
+        return consumerPool = createConsumerPool(context, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+        final int maxLeases = context.getMaxConcurrentTasks();
+        final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
+                ? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
         final Map<String, String> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
-        final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        final String topicListing = context.getProperty(ConsumeKafka.TOPICS).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         for (final String topic : topicListing.split(",", 100)) {
             final String trimmedName = topic.trim();
@@ -225,213 +232,40 @@ public class ConsumeKafka extends AbstractProcessor {
                 topics.add(trimmedName);
             }
         }
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
-    }
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
 
-    protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-        return new ConsumerPool(maxLeases, topics, props, log);
+        return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final long startTimeNanos = System.nanoTime();
         final ConsumerPool pool = getConsumerPool(context);
         if (pool == null) {
             context.yield();
             return;
         }
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
-        try (final ConsumerLease lease = pool.obtainConsumer()) {
-            try {
-                if (lease == null) {
-                    context.yield();
-                    return;
-                }
 
-                final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
-                if (!foundData) {
-                    session.rollback();
-                    return;
-                }
-
-                writeSessionData(context, session, partitionRecordMap, startTimeNanos);
-                //At-least once commit handling (if order is reversed it is at-most once)
-                session.commit();
-                commitOffsets(lease, partitionRecordMap);
-            } catch (final KafkaException ke) {
-                lease.poison();
-                getLogger().error("Problem while accessing kafka consumer " + ke, ke);
+        try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+            if (lease == null) {
                 context.yield();
-                session.rollback();
-            }
-        }
-    }
-
-    private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
-        final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
-        partitionRecordMap.entrySet().stream()
-                .filter(entry -> !entry.getValue().isEmpty())
-                .forEach((entry) -> {
-                    long maxOffset = entry.getValue().stream()
-                            .mapToLong(record -> record.offset())
-                            .max()
-                            .getAsLong();
-                    partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
-                });
-        lease.commitOffsets(partOffsetMap);
-    }
-
-    private void writeSessionData(
-            final ProcessContext context, final ProcessSession session,
-            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
-            final long startTimeNanos) {
-        if (demarcatorBytes != null) {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .forEach(entry -> {
-                        writeData(context, session, entry.getValue(), startTimeNanos);
-                    });
-        } else {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .flatMap(entry -> entry.getValue().stream())
-                    .forEach(record -> {
-                        writeData(context, session, Collections.singletonList(record), startTimeNanos);
-                    });
-        }
-    }
-
-    private String encodeKafkaKey(final byte[] key, final String encoding) {
-        if (key == null) {
-            return null;
-        }
-
-        if (HEX_ENCODING.getValue().equals(encoding)) {
-            return DatatypeConverter.printHexBinary(key);
-        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
-            return new String(key, StandardCharsets.UTF_8);
-        } else {
-            return null;    // won't happen because it is guaranteed by the Allowable Values
-        }
-    }
-
-    private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
-        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
-        final String offset = String.valueOf(firstRecord.offset());
-        final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
-        final String topic = firstRecord.topic();
-        final String partition = String.valueOf(firstRecord.partition());
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, out -> {
-            boolean useDemarcator = false;
-            for (final ConsumerRecord<byte[], byte[]> record : records) {
-                if (useDemarcator) {
-                    out.write(demarcatorBytes);
-                }
-                out.write(record.value());
-                useDemarcator = true;
+                return;
             }
-        });
-        final Map<String, String> kafkaAttrs = new HashMap<>();
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
-        if (keyValue != null && records.size() == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
-        }
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
-        if (records.size() > 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
-        }
-        flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
-        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(
-                context.getProperty(SECURITY_PROTOCOL).getValue(),
-                context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
-                topic);
-        session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
-        this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
-                new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
-        session.transfer(flowFile, REL_SUCCESS);
-    }
-
-    /**
-     * Populates the given partitionRecordMap with new records until we poll
-     * that returns no records or until we have enough data. It is important to
-     * ensure we keep items grouped by their topic and partition so that when we
-     * bundle them we bundle them intelligently and so that we can set offsets
-     * properly even across multiple poll calls.
-     */
-    private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
-        final long startNanos = System.nanoTime();
-        boolean foundData = false;
-        ConsumerRecords<byte[], byte[]> records;
-        final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
-
-        do {
-            records = lease.poll();
-
-            for (final TopicPartition partition : records.partitions()) {
-                List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
-                if (currList == null) {
-                    currList = new ArrayList<>();
-                    partitionRecordMap.put(partition, currList);
+            try {
+                while (this.isScheduled() && lease.continuePolling()) {
+                    lease.poll();
                 }
-                currList.addAll(records.records(partition));
-                if (currList.size() > 0) {
-                    foundData = true;
+                if (this.isScheduled() && !lease.commit()) {
+                    context.yield();
                 }
+            } catch (final KafkaException kex) {
+                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+                        new Object[]{lease, kex}, kex);
+            } catch (final Throwable t) {
+                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+                        new Object[]{lease, t}, t);
             }
-            //If we received data and we still want to get more
-        } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
-        return foundData;
-    }
-
-    /**
-     * Determines if we have enough data as-is and should move on.
-     *
-     * @return true if we've been gathering for more than 500 ms or if we're
-     * demarcating and have more than 50 flowfiles worth or if we're per message
-     * and have more than 2000 flowfiles or if totalMessageSize is greater than
-     * two megabytes; false otherwise
-     *
-     * Implementation note: 500 millis and 5 MB are magic numbers. These may
-     * need to be tuned. They get at how often offsets will get committed to
-     * kafka relative to how many records will get buffered into memory in a
-     * poll call before writing to repos.
-     */
-    private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long maxRecords, final long startTimeNanos) {
-
-        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
-        if (durationMillis > 500) {
-            return true;
-        }
-
-        int topicPartitionsFilled = 0;
-        int totalRecords = 0;
-        long totalRecordSize = 0;
-
-        for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
-            if (!recordList.isEmpty()) {
-                topicPartitionsFilled++;
-            }
-            totalRecords += recordList.size();
-            for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
-                totalRecordSize += rec.value().length;
-            }
-        }
-
-        if (demarcatorBytes != null && demarcatorBytes.length > 0) {
-            return topicPartitionsFilled > 50;
-        } else if (totalRecordSize > TWO_MB) {
-            return true;
-        } else {
-            return totalRecords > maxRecords;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..5b8ba1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,27 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
  * the lease will be returned to the pool for future use by others. A given
  * lease may only belong to a single thread a time.
  */
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+    private final long maxWaitMillis;
+    private final Consumer<byte[], byte[]> kafkaConsumer;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private boolean poisoned = false;
+    //used for tracking demarcated flowfiles to their TopicPartition so we can append
+    //to them on subsequent poll calls
+    private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+    private long leaseStartNanos = -1;
+    private boolean lastPollEmpty = false;
+    private int totalFlowFiles = 0;
+
+    ConsumerLease(
+            final long maxWaitMillis,
+            final Consumer<byte[], byte[]> kafkaConsumer,
+            final byte[] demarcatorBytes,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.maxWaitMillis = maxWaitMillis;
+        this.kafkaConsumer = kafkaConsumer;
+        this.demarcatorBytes = demarcatorBytes;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.logger = logger;
+    }
+
+    /**
+     * clears out internal state elements excluding session and consumer as
+     * those are managed by the pool itself
+     */
+    private void resetInternalState() {
+        bundleMap.clear();
+        uncommittedOffsetsMap.clear();
+        leaseStartNanos = -1;
+        lastPollEmpty = false;
+        totalFlowFiles = 0;
+    }
+
+    /**
+     * Kafka will call this method whenever it is about to rebalance the
+     * consumers for the given partitions. We'll simply take this to mean that
+     * we need to quickly commit what we've got and will return the consumer to
+     * the pool. This method will be called during the poll() method call of
+     * this class and will be called by the same thread calling poll according
+     * to the Kafka API docs. After this method executes the session and kafka
+     * offsets are committed and this lease is closed.
+     *
+     * @param partitions partitions being reassigned
+     */
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
+        commit();
+    }
 
     /**
-     * Executes a poll on the underlying Kafka Consumer.
+     * This will be called by Kafka when the rebalance has completed. We don't
+     * need to do anything with this information other than optionally log it as
+     * by this point we've committed what we've got and moved on.
      *
-     * @return ConsumerRecords retrieved in the poll.
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * @param partitions topic partition set being reassigned
      */
-    ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+    }
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer and creates any new
+     * flowfiles necessary or appends to existing ones if in demarcation mode.
+     */
+    void poll() {
+        /**
+         * Implementation note: If we take too long (30 secs?) between kafka
+         * poll calls and our own record processing to any subsequent poll calls
+         * or the commit we can run into a situation where the commit will
+         * succeed to the session but fail on committing offsets. This is
+         * apparently different than the Kafka scenario of electing to rebalance
+         * for other reasons but in this case is due a session timeout. It
+         * appears Kafka KIP-62 aims to offer more control over the meaning of
+         * various timeouts. If we do run into this case it could result in
+         * duplicates.
+         */
+        try {
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            lastPollEmpty = records.count() == 0;
+            processRecords(records);
+        } catch (final Throwable t) {
+            this.poison();
+            throw t;
+        }
+    }
 
     /**
      * Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
      * kafka client to collect more data from Kafka before committing the
      * offsets.
      *
-     * @param offsets offsets
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * if false then we didn't do anything and should probably yield if true
+     * then we committed new data
+     *
+     */
+    boolean commit() {
+        if (uncommittedOffsetsMap.isEmpty()) {
+            resetInternalState();
+            return false;
+        }
+        try {
+            /**
+             * Committing the nifi session then the offsets means we have an at
+             * least once guarantee here. If we reversed the order we'd have at
+             * most once.
+             */
+            final Collection<FlowFile> bundledFlowFiles = getBundles();
+            if (!bundledFlowFiles.isEmpty()) {
+                getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+            }
+            getProcessSession().commit();
+            kafkaConsumer.commitSync(uncommittedOffsetsMap);
+            resetInternalState();
+            return true;
+        } catch (final KafkaException kex) {
+            poison();
+            logger.warn("Duplicates are likely as we were able to commit the process"
+                    + " session but received an exception from Kafka while committing"
+                    + " offsets.");
+            throw kex;
+        } catch (final Throwable t) {
+            poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Indicates whether we should continue polling for data. If we are not
+     * writing data with a demarcator then we're writing individual flow files
+     * per kafka message therefore we must be very mindful of memory usage for
+     * the flow file objects (not their content) being held in memory. The
+     * content of kafka messages will be written to the content repository
+     * immediately upon each poll call but we must still be mindful of how much
+     * memory can be used in each poll call. We will indicate that we should
+     * stop polling our last poll call produced no new results or if we've
+     * polling and processing data longer than the specified maximum polling
+     * time or if we have reached out specified max flow file limit or if a
+     * rebalance has been initiated for one of the partitions we're watching;
+     * otherwise true.
+     *
+     * @return true if should keep polling; false otherwise
+     */
+    boolean continuePolling() {
+        //stop if the last poll produced new no data
+        if (lastPollEmpty) {
+            return false;
+        }
+
+        //stop if we've gone past our desired max uncommitted wait time
+        if (leaseStartNanos < 0) {
+            leaseStartNanos = System.nanoTime();
+        }
+        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        if (durationMillis > maxWaitMillis) {
+            return false;
+        }
+
+        //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+        if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+            return false;
+        } else {
+            return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+        }
+    }
+
+    /**
+     * Indicates that the underlying session and consumer should be immediately
+     * considered invalid. Once closed the session will be rolled back and the
+     * pool should destroy the underlying consumer. This is useful if due to
+     * external reasons, such as the processor no longer being scheduled, this
+     * lease should be terminated immediately.
      */
-    void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+    private void poison() {
+        poisoned = true;
+    }
 
     /**
-     * Notifies that this lease is poisoned and should not be reused.
+     * @return true if this lease has been poisoned; false otherwise
      */
-    void poison();
+    boolean isPoisoned() {
+        return poisoned;
+    }
 
     /**
-     * Notifies that this lease is to be returned. The pool may optionally reuse
-     * this lease with another client. No further references by the caller
-     * should occur after calling close.
+     * Abstract method that is intended to be extended by the pool that created
+     * this ConsumerLease object. It should ensure that the session given to
+     * create this session is rolled back and that the underlying kafka consumer
+     * is either returned to the pool for continued use or destroyed if this
+     * lease has been poisoned. It can only be called once. Calling it more than
+     * once can result in undefined and non threadsafe behavior.
      */
     @Override
-    void close();
+    public void close() {
+        resetInternalState();
+    }
+
+    public abstract ProcessSession getProcessSession();
+
+    private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+        records.partitions().stream().forEach(partition -> {
+            List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+            if (!messages.isEmpty()) {
+                //update maximum offset map for this topic partition
+                long maxOffset = messages.stream()
+                        .mapToLong(record -> record.offset())
+                        .max()
+                        .getAsLong();
+                uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+                //write records to content repository and session
+                if (demarcatorBytes == null) {
+                    totalFlowFiles += messages.size();
+                    messages.stream().forEach(message -> {
+                        writeData(getProcessSession(), message, partition);
+                    });
+                } else {
+                    writeData(getProcessSession(), messages, partition);
+                }
+            }
+        });
+    }
+
+    private static String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;  // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
+    private Collection<FlowFile> getBundles() {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        for (final BundleTracker tracker : bundleMap.values()) {
+            populateAttributes(tracker);
+            flowFiles.add(tracker.flowFile);
+        }
+        return flowFiles;
+    }
+
+    private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+        FlowFile flowFile = session.create();
+        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+        tracker.incrementRecordCount(1);
+        flowFile = session.write(flowFile, out -> {
+            out.write(record.value());
+        });
+        tracker.updateFlowFile(flowFile);
+        populateAttributes(tracker);
+        session.transfer(tracker.flowFile, REL_SUCCESS);
+    }
+
+    private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+        final boolean demarcateFirstRecord;
+        BundleTracker tracker = bundleMap.get(topicPartition);
+        FlowFile flowFile;
+        if (tracker == null) {
+            tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+            flowFile = session.create();
+            tracker.updateFlowFile(flowFile);
+            demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+        } else {
+            demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+        }
+        flowFile = tracker.flowFile;
+        tracker.incrementRecordCount(records.size());
+        flowFile = session.append(flowFile, out -> {
+            boolean useDemarcator = demarcateFirstRecord;
+            for (final ConsumerRecord<byte[], byte[]> record : records) {
+                if (useDemarcator) {
+                    out.write(demarcatorBytes);
+                }
+                out.write(record.value());
+                useDemarcator = true;
+            }
+        });
+        tracker.updateFlowFile(flowFile);
+        bundleMap.put(topicPartition, tracker);
+    }
+
+    private void populateAttributes(final BundleTracker tracker) {
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        if (tracker.key != null && tracker.totalRecords == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        if (tracker.totalRecords > 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+        }
+        final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+        tracker.updateFlowFile(newFlowFile);
+    }
+
+    private static class BundleTracker {
+
+        final long initialOffset;
+        final int partition;
+        final String topic;
+        final String key;
+        FlowFile flowFile;
+        long totalRecords = 0;
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this.initialOffset = initialRecord.offset();
+            this.partition = topicPartition.partition();
+            this.topic = topicPartition.topic();
+            this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+        }
+
+        private void incrementRecordCount(final long count) {
+            totalRecords += count;
+        }
+
+        private void updateFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+    }
 
 }