You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/09/06 19:49:11 UTC

[2/3] nifi git commit: NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index a85563d..6fd9053 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,104 +16,36 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class ConsumeKafkaTest {
 
-    static class MockConsumerPool extends ConsumerPool {
-
-        final int actualMaxLeases;
-        final List<String> actualTopics;
-        final Map<String, String> actualKafkaProperties;
-        boolean throwKafkaExceptionOnPoll = false;
-        boolean throwKafkaExceptionOnCommit = false;
-        Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
-        Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
-        Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
-        boolean wasConsumerLeasePoisoned = false;
-        boolean wasConsumerLeaseClosed = false;
-        boolean wasPoolClosed = false;
-
-        public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
-            super(maxLeases, topics, kafkaProperties, null);
-            actualMaxLeases = maxLeases;
-            actualTopics = topics;
-            actualKafkaProperties = kafkaProperties;
-        }
-
-        @Override
-        public ConsumerLease obtainConsumer() {
-            return new ConsumerLease() {
-                @Override
-                public ConsumerRecords<byte[], byte[]> poll() {
-                    if (throwKafkaExceptionOnPoll) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
-                    return (records == null) ? ConsumerRecords.empty() : records;
-                }
-
-                @Override
-                public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
-                    if (throwKafkaExceptionOnCommit) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    actualCommitOffsets = offsets;
-                }
-
-                @Override
-                public void poison() {
-                    wasConsumerLeasePoisoned = true;
-                }
-
-                @Override
-                public void close() {
-                    wasConsumerLeaseClosed = true;
-                }
-            };
-        }
-
-        @Override
-        public void close() {
-            wasPoolClosed = true;
-        }
-
-        void resetState() {
-            throwKafkaExceptionOnPoll = false;
-            throwKafkaExceptionOnCommit = false;
-            nextPlannedRecordsQueue = null;
-            nextExpectedCommitOffsets = null;
-            wasConsumerLeasePoisoned = false;
-            wasConsumerLeaseClosed = false;
-            wasPoolClosed = false;
-        }
+    Consumer<byte[], byte[]> mockConsumer = null;
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
 
+    @Before
+    public void setup() {
+        mockConsumer = mock(Consumer.class);
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
     }
 
     @Test
@@ -174,31 +106,14 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessages() throws Exception {
         String groupName = "validateGetAllMessages";
 
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
         ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -207,69 +122,29 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-        } else {
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        }
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
     @Test
-    public void validateGetLotsOfMessages() throws Exception {
-        String groupName = "validateGetLotsOfMessages";
-
-        final byte[][] firstPassValues = new byte[10010][1];
-        for (final byte[] value : firstPassValues) {
-            value[0] = 0x12;
-        }
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
 
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
         ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -278,352 +153,15 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
-        assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
-        assertEquals(1, mockPool.actualCommitOffsets.size());
-        assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-        for (final byte[] rawRecord : rawRecords) {
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-
-        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
-            final byte[] key = entry.getKey();
-            final byte[] rawRecord = entry.getValue();
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        for (final ConsumerRecords<byte[], byte[]> rec : records) {
-            rec.partitions().stream().forEach((part) -> {
-                final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
-                if (map.get(part) != null) {
-                    throw new IllegalStateException("already have that topic/partition in the record map");
-                }
-                map.put(part, conRecs);
-            });
-        }
-        return new ConsumerRecords<>(map);
-    }
-
-    @Test
-    public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
-        String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues),
-                createConsumerRecords("bar", 1, 1L, secondPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(2, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertEquals(2, mockPool.actualCommitOffsets.size());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-    }
-
-    @Test
-    public void validatePollException() throws Exception {
-        String groupName = "validatePollException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnPoll = true;
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(0, flowFiles.size());
-        assertNull(null, mockPool.actualCommitOffsets);
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateCommitOffsetException() throws Exception {
-        String groupName = "validateCommitOffsetException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnCommit = true;
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(1, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertNull(null, mockPool.actualCommitOffsets);
-    }
-
-    @Test
-    public void validateUtf8Key() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateHexKey() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerPoolTest {
 
     Consumer<byte[], byte[]> consumer = null;
+    ProcessSession mockSession = null;
+    ProvenanceReporter mockReporter = null;
+    ConsumerPool testPool = null;
+    ConsumerPool testDemarcatedPool = null;
     ComponentLog logger = null;
 
     @Before
     public void setup() {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
-    }
-
-    @Test
-    public void validatePoolSimpleCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
             }
         };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
 
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
             lease.poll();
-            lease.commitOffsets(Collections.emptyMap());
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
-        assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(1, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
+        assertEquals(4, stats.leasesObtainedCount);
     }
 
     @Test
-    public void validatePoolSimpleBatchCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
 
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
-            try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
                 for (int j = 0; j < 100; j++) {
                     lease.poll();
                 }
-                lease.commitOffsets(Collections.emptyMap());
             }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(100, stats.leasesObtainedCount);
-        assertEquals(10000, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
 
     @Test
-    public void validatePoolConsumerFails() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
             lease.poll();
-            fail();
-        } catch (final KafkaException ke) {
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
 
+            }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(0, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index af0d343..b3f1bd1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -358,7 +358,7 @@ public class PublishKafkaTest {
         TestRunner runner = TestRunners.newTestRunner(putKafka);
         runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
+        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
         runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
 
         final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 0a3fe5d..0b8a752 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -25,13 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -53,17 +46,18 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API. "
         + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
         + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the mean time"
         + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
-@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9"})
 @WritesAttributes({
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
-        + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+            + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
 public class ConsumeKafka extends AbstractProcessor {
 
-    private static final long TWO_MB = 2L * 1024L * 1024L;
-
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
@@ -136,6 +124,7 @@ public class ConsumeKafka extends AbstractProcessor {
                     + "will result in a single FlowFile which  "
                     + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
             .build();
+
     static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
             .name("max.poll.records")
             .displayName("Max Poll Records")
@@ -145,6 +134,20 @@ public class ConsumeKafka extends AbstractProcessor {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+            .name("max-uncommit-offset-wait")
+            .displayName("Max Uncommitted Time")
+            .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+                    + "This value impacts how often offsets will be committed.  Committing offsets less often increases "
+                    + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+                    + "or JVM restart between commits.  This value is also related to maximum poll records and the use "
+                    + "of a message demarcator.  When using a message demarcator we can have far more uncommitted messages "
+                    + "than when we're not as there is much less for us to keep track of in memory.")
+            .required(false)
+            .defaultValue("1 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,7 +156,6 @@ public class ConsumeKafka extends AbstractProcessor {
     static final List<PropertyDescriptor> DESCRIPTORS;
     static final Set<Relationship> RELATIONSHIPS;
 
-    private volatile byte[] demarcatorBytes = null;
     private volatile ConsumerPool consumerPool = null;
 
     static {
@@ -165,6 +167,7 @@ public class ConsumeKafka extends AbstractProcessor {
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
         descriptors.add(MAX_POLL_RECORDS);
+        descriptors.add(MAX_UNCOMMITTED_TIME);
         DESCRIPTORS = Collections.unmodifiableList(descriptors);
         RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
     }
@@ -179,16 +182,8 @@ public class ConsumeKafka extends AbstractProcessor {
         return DESCRIPTORS;
     }
 
-    @OnScheduled
-    public void prepareProcessing(final ProcessContext context) {
-        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
-                : null;
-    }
-
     @OnStopped
     public void close() {
-        demarcatorBytes = null;
         final ConsumerPool pool = consumerPool;
         consumerPool = null;
         if (pool != null) {
@@ -215,9 +210,21 @@ public class ConsumeKafka extends AbstractProcessor {
             return pool;
         }
 
+        return consumerPool = createConsumerPool(context, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+        final int maxLeases = context.getMaxConcurrentTasks();
+        final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
+                ? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
         final Map<String, String> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
-        final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        final String topicListing = context.getProperty(ConsumeKafka.TOPICS).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         for (final String topic : topicListing.split(",", 100)) {
             final String trimmedName = topic.trim();
@@ -225,213 +232,40 @@ public class ConsumeKafka extends AbstractProcessor {
                 topics.add(trimmedName);
             }
         }
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
-    }
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
 
-    protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-        return new ConsumerPool(maxLeases, topics, props, log);
+        return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final long startTimeNanos = System.nanoTime();
         final ConsumerPool pool = getConsumerPool(context);
         if (pool == null) {
             context.yield();
             return;
         }
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
-        try (final ConsumerLease lease = pool.obtainConsumer()) {
-            try {
-                if (lease == null) {
-                    context.yield();
-                    return;
-                }
 
-                final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
-                if (!foundData) {
-                    session.rollback();
-                    return;
-                }
-
-                writeSessionData(context, session, partitionRecordMap, startTimeNanos);
-                //At-least once commit handling (if order is reversed it is at-most once)
-                session.commit();
-                commitOffsets(lease, partitionRecordMap);
-            } catch (final KafkaException ke) {
-                lease.poison();
-                getLogger().error("Problem while accessing kafka consumer " + ke, ke);
+        try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+            if (lease == null) {
                 context.yield();
-                session.rollback();
-            }
-        }
-    }
-
-    private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
-        final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
-        partitionRecordMap.entrySet().stream()
-                .filter(entry -> !entry.getValue().isEmpty())
-                .forEach((entry) -> {
-                    long maxOffset = entry.getValue().stream()
-                            .mapToLong(record -> record.offset())
-                            .max()
-                            .getAsLong();
-                    partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
-                });
-        lease.commitOffsets(partOffsetMap);
-    }
-
-    private void writeSessionData(
-            final ProcessContext context, final ProcessSession session,
-            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
-            final long startTimeNanos) {
-        if (demarcatorBytes != null) {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .forEach(entry -> {
-                        writeData(context, session, entry.getValue(), startTimeNanos);
-                    });
-        } else {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .flatMap(entry -> entry.getValue().stream())
-                    .forEach(record -> {
-                        writeData(context, session, Collections.singletonList(record), startTimeNanos);
-                    });
-        }
-    }
-
-    private String encodeKafkaKey(final byte[] key, final String encoding) {
-        if (key == null) {
-            return null;
-        }
-
-        if (HEX_ENCODING.getValue().equals(encoding)) {
-            return DatatypeConverter.printHexBinary(key);
-        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
-            return new String(key, StandardCharsets.UTF_8);
-        } else {
-            return null;    // won't happen because it is guaranteed by the Allowable Values
-        }
-    }
-
-    private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
-        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
-        final String offset = String.valueOf(firstRecord.offset());
-        final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
-        final String topic = firstRecord.topic();
-        final String partition = String.valueOf(firstRecord.partition());
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, out -> {
-            boolean useDemarcator = false;
-            for (final ConsumerRecord<byte[], byte[]> record : records) {
-                if (useDemarcator) {
-                    out.write(demarcatorBytes);
-                }
-                out.write(record.value());
-                useDemarcator = true;
+                return;
             }
-        });
-        final Map<String, String> kafkaAttrs = new HashMap<>();
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
-        if (keyValue != null && records.size() == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
-        }
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
-        if (records.size() > 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
-        }
-        flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
-        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(
-                context.getProperty(SECURITY_PROTOCOL).getValue(),
-                context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
-                topic);
-        session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
-        this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
-                new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
-        session.transfer(flowFile, REL_SUCCESS);
-    }
-
-    /**
-     * Populates the given partitionRecordMap with new records until we poll
-     * that returns no records or until we have enough data. It is important to
-     * ensure we keep items grouped by their topic and partition so that when we
-     * bundle them we bundle them intelligently and so that we can set offsets
-     * properly even across multiple poll calls.
-     */
-    private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
-        final long startNanos = System.nanoTime();
-        boolean foundData = false;
-        ConsumerRecords<byte[], byte[]> records;
-        final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
-
-        do {
-            records = lease.poll();
-
-            for (final TopicPartition partition : records.partitions()) {
-                List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
-                if (currList == null) {
-                    currList = new ArrayList<>();
-                    partitionRecordMap.put(partition, currList);
+            try {
+                while (this.isScheduled() && lease.continuePolling()) {
+                    lease.poll();
                 }
-                currList.addAll(records.records(partition));
-                if (currList.size() > 0) {
-                    foundData = true;
+                if (this.isScheduled() && !lease.commit()) {
+                    context.yield();
                 }
+            } catch (final KafkaException kex) {
+                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+                        new Object[]{lease, kex}, kex);
+            } catch (final Throwable t) {
+                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+                        new Object[]{lease, t}, t);
             }
-            //If we received data and we still want to get more
-        } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
-        return foundData;
-    }
-
-    /**
-     * Determines if we have enough data as-is and should move on.
-     *
-     * @return true if we've been gathering for more than 500 ms or if we're
-     * demarcating and have more than 50 flowfiles worth or if we're per message
-     * and have more than 2000 flowfiles or if totalMessageSize is greater than
-     * two megabytes; false otherwise
-     *
-     * Implementation note: 500 millis and 5 MB are magic numbers. These may
-     * need to be tuned. They get at how often offsets will get committed to
-     * kafka relative to how many records will get buffered into memory in a
-     * poll call before writing to repos.
-     */
-    private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long maxRecords, final long startTimeNanos) {
-
-        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
-        if (durationMillis > 500) {
-            return true;
-        }
-
-        int topicPartitionsFilled = 0;
-        int totalRecords = 0;
-        long totalRecordSize = 0;
-
-        for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
-            if (!recordList.isEmpty()) {
-                topicPartitionsFilled++;
-            }
-            totalRecords += recordList.size();
-            for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
-                totalRecordSize += rec.value().length;
-            }
-        }
-
-        if (demarcatorBytes != null && demarcatorBytes.length > 0) {
-            return topicPartitionsFilled > 50;
-        } else if (totalRecordSize > TWO_MB) {
-            return true;
-        } else {
-            return totalRecords > maxRecords;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..5b8ba1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,27 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
  * the lease will be returned to the pool for future use by others. A given
  * lease may only belong to a single thread a time.
  */
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+    private final long maxWaitMillis;
+    private final Consumer<byte[], byte[]> kafkaConsumer;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private boolean poisoned = false;
+    //used for tracking demarcated flowfiles to their TopicPartition so we can append
+    //to them on subsequent poll calls
+    private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+    private long leaseStartNanos = -1;
+    private boolean lastPollEmpty = false;
+    private int totalFlowFiles = 0;
+
+    ConsumerLease(
+            final long maxWaitMillis,
+            final Consumer<byte[], byte[]> kafkaConsumer,
+            final byte[] demarcatorBytes,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.maxWaitMillis = maxWaitMillis;
+        this.kafkaConsumer = kafkaConsumer;
+        this.demarcatorBytes = demarcatorBytes;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.logger = logger;
+    }
+
+    /**
+     * clears out internal state elements excluding session and consumer as
+     * those are managed by the pool itself
+     */
+    private void resetInternalState() {
+        bundleMap.clear();
+        uncommittedOffsetsMap.clear();
+        leaseStartNanos = -1;
+        lastPollEmpty = false;
+        totalFlowFiles = 0;
+    }
+
+    /**
+     * Kafka will call this method whenever it is about to rebalance the
+     * consumers for the given partitions. We'll simply take this to mean that
+     * we need to quickly commit what we've got and will return the consumer to
+     * the pool. This method will be called during the poll() method call of
+     * this class and will be called by the same thread calling poll according
+     * to the Kafka API docs. After this method executes the session and kafka
+     * offsets are committed and this lease is closed.
+     *
+     * @param partitions partitions being reassigned
+     */
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
+        commit();
+    }
 
     /**
-     * Executes a poll on the underlying Kafka Consumer.
+     * This will be called by Kafka when the rebalance has completed. We don't
+     * need to do anything with this information other than optionally log it as
+     * by this point we've committed what we've got and moved on.
      *
-     * @return ConsumerRecords retrieved in the poll.
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * @param partitions topic partition set being reassigned
      */
-    ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+    }
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer and creates any new
+     * flowfiles necessary or appends to existing ones if in demarcation mode.
+     */
+    void poll() {
+        /**
+         * Implementation note: If we take too long (30 secs?) between kafka
+         * poll calls and our own record processing to any subsequent poll calls
+         * or the commit we can run into a situation where the commit will
+         * succeed to the session but fail on committing offsets. This is
+         * apparently different than the Kafka scenario of electing to rebalance
+         * for other reasons but in this case is due a session timeout. It
+         * appears Kafka KIP-62 aims to offer more control over the meaning of
+         * various timeouts. If we do run into this case it could result in
+         * duplicates.
+         */
+        try {
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            lastPollEmpty = records.count() == 0;
+            processRecords(records);
+        } catch (final Throwable t) {
+            this.poison();
+            throw t;
+        }
+    }
 
     /**
      * Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
      * kafka client to collect more data from Kafka before committing the
      * offsets.
      *
-     * @param offsets offsets
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * if false then we didn't do anything and should probably yield if true
+     * then we committed new data
+     *
+     */
+    boolean commit() {
+        if (uncommittedOffsetsMap.isEmpty()) {
+            resetInternalState();
+            return false;
+        }
+        try {
+            /**
+             * Committing the nifi session then the offsets means we have an at
+             * least once guarantee here. If we reversed the order we'd have at
+             * most once.
+             */
+            final Collection<FlowFile> bundledFlowFiles = getBundles();
+            if (!bundledFlowFiles.isEmpty()) {
+                getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+            }
+            getProcessSession().commit();
+            kafkaConsumer.commitSync(uncommittedOffsetsMap);
+            resetInternalState();
+            return true;
+        } catch (final KafkaException kex) {
+            poison();
+            logger.warn("Duplicates are likely as we were able to commit the process"
+                    + " session but received an exception from Kafka while committing"
+                    + " offsets.");
+            throw kex;
+        } catch (final Throwable t) {
+            poison();
+            throw t;
+        }
+    }
+
+    /**
+     * Indicates whether we should continue polling for data. If we are not
+     * writing data with a demarcator then we're writing individual flow files
+     * per kafka message therefore we must be very mindful of memory usage for
+     * the flow file objects (not their content) being held in memory. The
+     * content of kafka messages will be written to the content repository
+     * immediately upon each poll call but we must still be mindful of how much
+     * memory can be used in each poll call. We will indicate that we should
+     * stop polling our last poll call produced no new results or if we've
+     * polling and processing data longer than the specified maximum polling
+     * time or if we have reached out specified max flow file limit or if a
+     * rebalance has been initiated for one of the partitions we're watching;
+     * otherwise true.
+     *
+     * @return true if should keep polling; false otherwise
+     */
+    boolean continuePolling() {
+        //stop if the last poll produced new no data
+        if (lastPollEmpty) {
+            return false;
+        }
+
+        //stop if we've gone past our desired max uncommitted wait time
+        if (leaseStartNanos < 0) {
+            leaseStartNanos = System.nanoTime();
+        }
+        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        if (durationMillis > maxWaitMillis) {
+            return false;
+        }
+
+        //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+        if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+            return false;
+        } else {
+            return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+        }
+    }
+
+    /**
+     * Indicates that the underlying session and consumer should be immediately
+     * considered invalid. Once closed the session will be rolled back and the
+     * pool should destroy the underlying consumer. This is useful if due to
+     * external reasons, such as the processor no longer being scheduled, this
+     * lease should be terminated immediately.
      */
-    void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+    private void poison() {
+        poisoned = true;
+    }
 
     /**
-     * Notifies that this lease is poisoned and should not be reused.
+     * @return true if this lease has been poisoned; false otherwise
      */
-    void poison();
+    boolean isPoisoned() {
+        return poisoned;
+    }
 
     /**
-     * Notifies that this lease is to be returned. The pool may optionally reuse
-     * this lease with another client. No further references by the caller
-     * should occur after calling close.
+     * Abstract method that is intended to be extended by the pool that created
+     * this ConsumerLease object. It should ensure that the session given to
+     * create this session is rolled back and that the underlying kafka consumer
+     * is either returned to the pool for continued use or destroyed if this
+     * lease has been poisoned. It can only be called once. Calling it more than
+     * once can result in undefined and non threadsafe behavior.
      */
     @Override
-    void close();
+    public void close() {
+        resetInternalState();
+    }
+
+    public abstract ProcessSession getProcessSession();
+
+    private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+        records.partitions().stream().forEach(partition -> {
+            List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+            if (!messages.isEmpty()) {
+                //update maximum offset map for this topic partition
+                long maxOffset = messages.stream()
+                        .mapToLong(record -> record.offset())
+                        .max()
+                        .getAsLong();
+                uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+                //write records to content repository and session
+                if (demarcatorBytes == null) {
+                    totalFlowFiles += messages.size();
+                    messages.stream().forEach(message -> {
+                        writeData(getProcessSession(), message, partition);
+                    });
+                } else {
+                    writeData(getProcessSession(), messages, partition);
+                }
+            }
+        });
+    }
+
+    private static String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;  // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
+    private Collection<FlowFile> getBundles() {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        for (final BundleTracker tracker : bundleMap.values()) {
+            populateAttributes(tracker);
+            flowFiles.add(tracker.flowFile);
+        }
+        return flowFiles;
+    }
+
+    private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+        FlowFile flowFile = session.create();
+        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+        tracker.incrementRecordCount(1);
+        flowFile = session.write(flowFile, out -> {
+            out.write(record.value());
+        });
+        tracker.updateFlowFile(flowFile);
+        populateAttributes(tracker);
+        session.transfer(tracker.flowFile, REL_SUCCESS);
+    }
+
+    private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+        final boolean demarcateFirstRecord;
+        BundleTracker tracker = bundleMap.get(topicPartition);
+        FlowFile flowFile;
+        if (tracker == null) {
+            tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+            flowFile = session.create();
+            tracker.updateFlowFile(flowFile);
+            demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+        } else {
+            demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+        }
+        flowFile = tracker.flowFile;
+        tracker.incrementRecordCount(records.size());
+        flowFile = session.append(flowFile, out -> {
+            boolean useDemarcator = demarcateFirstRecord;
+            for (final ConsumerRecord<byte[], byte[]> record : records) {
+                if (useDemarcator) {
+                    out.write(demarcatorBytes);
+                }
+                out.write(record.value());
+                useDemarcator = true;
+            }
+        });
+        tracker.updateFlowFile(flowFile);
+        bundleMap.put(topicPartition, tracker);
+    }
+
+    private void populateAttributes(final BundleTracker tracker) {
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        if (tracker.key != null && tracker.totalRecords == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        if (tracker.totalRecords > 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+        }
+        final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+        tracker.updateFlowFile(newFlowFile);
+    }
+
+    private static class BundleTracker {
+
+        final long initialOffset;
+        final int partition;
+        final String topic;
+        final String key;
+        FlowFile flowFile;
+        long totalRecords = 0;
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this.initialOffset = initialRecord.offset();
+            this.partition = topicPartition.partition();
+            this.topic = topicPartition.topic();
+            this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+        }
+
+        private void incrementRecordCount(final long count) {
+            totalRecords += count;
+        }
+
+        private void updateFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+    }
 
 }