You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/15 18:51:51 UTC

[12/20] nifi git commit: NIFI-3198: Refactored how PublishKafka and PublishKafka_0_10 work to improve throughput and resilience. Fixed bug in StreamDemarcator. Slight refactoring of consume processors to simplify code.

NIFI-3198: Refactored how PublishKafka and PublishKafka_0_10 work to improve throughput and resilience. Fixed bug in StreamDemarcator. Slight refactoring of consume processors to simplify code.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ed17df50
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ed17df50
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ed17df50

Branch: refs/heads/support/nifi-1.0.x
Commit: ed17df503b6d842a439e549615c2750df6752632
Parents: 1ba7f83
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Dec 14 14:23:21 2016 -0500
Committer: jpercivall <JP...@apache.org>
Committed: Wed Dec 14 16:23:50 2016 -0500

----------------------------------------------------------------------
 .../io/exception/TokenTooLargeException.java    |  26 +
 .../nifi/stream/io/util/StreamDemarcator.java   |  39 +-
 .../stream/io/util/StreamDemarcatorTest.java    |  84 ++-
 .../nifi/util/StandardProcessorTestRunner.java  |  31 +-
 .../java/org/apache/nifi/util/TestRunner.java   |  16 +-
 .../kafka/pubsub/ConsumeKafka_0_10.java         | 315 +++------
 .../processors/kafka/pubsub/ConsumerLease.java  | 367 ++++++++++-
 .../processors/kafka/pubsub/ConsumerPool.java   | 287 ++++----
 .../kafka/pubsub/InFlightMessageTracker.java    | 178 +++++
 .../kafka/pubsub/KafkaProcessorUtils.java       |  37 +-
 .../processors/kafka/pubsub/KafkaPublisher.java | 236 -------
 .../kafka/pubsub/PublishKafka_0_10.java         | 646 +++++++------------
 .../processors/kafka/pubsub/PublishResult.java  |  56 ++
 .../processors/kafka/pubsub/PublisherLease.java | 132 ++++
 .../processors/kafka/pubsub/PublisherPool.java  |  98 +++
 .../kafka/pubsub/PublishingContext.java         | 124 ----
 .../kafka/pubsub/ConsumeKafkaTest.java          | 548 ++--------------
 .../kafka/pubsub/ConsumerPoolTest.java          | 172 +++--
 .../kafka/pubsub/KafkaPublisherTest.java        | 306 ---------
 .../kafka/pubsub/PublishKafkaTest.java          | 375 -----------
 .../kafka/pubsub/PublishingContextTest.java     |  91 ---
 .../kafka/pubsub/StubPublishKafka.java          | 143 ----
 .../pubsub/TestInFlightMessageTracker.java      |  87 +++
 .../kafka/pubsub/TestPublishKafka.java          | 262 ++++++++
 .../kafka/pubsub/TestPublisherLease.java        | 194 ++++++
 .../kafka/pubsub/TestPublisherPool.java         |  68 ++
 .../nifi/processors/kafka/KafkaPublisher.java   |   4 +-
 .../processors/kafka/pubsub/ConsumeKafka.java   | 313 +++------
 .../processors/kafka/pubsub/ConsumerLease.java  | 367 ++++++++++-
 .../processors/kafka/pubsub/ConsumerPool.java   | 288 ++++-----
 .../kafka/pubsub/InFlightMessageTracker.java    | 178 +++++
 .../kafka/pubsub/KafkaProcessorUtils.java       |  37 +-
 .../processors/kafka/pubsub/KafkaPublisher.java | 236 -------
 .../processors/kafka/pubsub/PublishKafka.java   | 641 +++++++-----------
 .../processors/kafka/pubsub/PublishResult.java  |  56 ++
 .../processors/kafka/pubsub/PublisherLease.java | 132 ++++
 .../processors/kafka/pubsub/PublisherPool.java  |  98 +++
 .../kafka/pubsub/PublishingContext.java         | 124 ----
 .../kafka/pubsub/ConsumeKafkaTest.java          | 555 ++--------------
 .../kafka/pubsub/ConsumerPoolTest.java          | 172 +++--
 .../kafka/pubsub/KafkaPublisherTest.java        | 306 ---------
 .../kafka/pubsub/PublishKafkaTest.java          | 375 -----------
 .../kafka/pubsub/PublishingContextTest.java     |  91 ---
 .../kafka/pubsub/StubPublishKafka.java          | 144 -----
 .../pubsub/TestInFlightMessageTracker.java      |  87 +++
 .../kafka/pubsub/TestPublishKafka.java          | 262 ++++++++
 .../kafka/pubsub/TestPublisherLease.java        | 194 ++++++
 .../kafka/pubsub/TestPublisherPool.java         |  68 ++
 48 files changed, 4338 insertions(+), 5308 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
new file mode 100644
index 0000000..7024f34
--- /dev/null
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.stream.io.exception;
+
+import java.io.IOException;
+
+public class TokenTooLargeException extends IOException {
+    public TokenTooLargeException(final String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
index 3064f1c..840bdb0 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
@@ -16,9 +16,12 @@
  */
 package org.apache.nifi.stream.io.util;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+
 /**
  * The <code>StreamDemarcator</code> class takes an input stream and demarcates
  * it so it could be read (see {@link #nextToken()}) as individual byte[]
@@ -26,7 +29,7 @@ import java.io.InputStream;
  * stream will be read into a single token which may result in
  * {@link OutOfMemoryError} if stream is too large.
  */
-public class StreamDemarcator {
+public class StreamDemarcator implements Closeable {
 
     private final static int INIT_BUFFER_SIZE = 8192;
 
@@ -95,8 +98,10 @@ public class StreamDemarcator {
     /**
      * Will read the next data token from the {@link InputStream} returning null
      * when it reaches the end of the stream.
+     *
+     * @throws IOException if unable to read from the stream
      */
-    public byte[] nextToken() {
+    public byte[] nextToken() throws IOException {
         byte[] data = null;
         int j = 0;
 
@@ -126,8 +131,10 @@ public class StreamDemarcator {
     /**
      * Will fill the current buffer from current 'index' position, expanding it
      * and or shuffling it if necessary
+     *
+     * @throws IOException if unable to read from the stream
      */
-    private void fill() {
+    private void fill() throws IOException {
         if (this.index >= this.buffer.length) {
             if (this.mark == 0) { // expand
                 byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
@@ -138,23 +145,20 @@ public class StreamDemarcator {
                 System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
                 this.index = length;
                 this.mark = 0;
+                this.readAheadLength = length;
             }
         }
 
-        try {
-            int bytesRead;
-            do {
-                bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
-            } while (bytesRead == 0);
+        int bytesRead;
+        do {
+            bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
+        } while (bytesRead == 0);
 
-            if (bytesRead != -1) {
-                this.readAheadLength = this.index + bytesRead;
-                if (this.readAheadLength > this.maxDataSize) {
-                    throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");
-                }
+        if (bytesRead != -1) {
+            this.readAheadLength = this.index + bytesRead;
+            if (this.readAheadLength > this.maxDataSize) {
+                throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
             }
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed while reading InputStream", e);
         }
     }
 
@@ -188,4 +192,9 @@ public class StreamDemarcator {
             throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
         }
     }
+
+    @Override
+    public void close() throws IOException {
+        is.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
index 93082a2..66d2668 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -65,7 +66,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateNoDelimiter() {
+    public void validateNoDelimiter() throws IOException {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -76,7 +77,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateNoDelimiterSmallInitialBuffer() {
+    public void validateNoDelimiterSmallInitialBuffer() throws IOException {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 1);
@@ -84,7 +85,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateSingleByteDelimiter() {
+    public void validateSingleByteDelimiter() throws IOException {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -95,7 +96,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateDelimiterAtTheBeginning() {
+    public void validateDelimiterAtTheBeginning() throws IOException {
         String data = ",Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -106,7 +107,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateEmptyDelimiterSegments() {
+    public void validateEmptyDelimiterSegments() throws IOException {
         String data = ",,,,,Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -117,7 +118,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateSingleByteDelimiterSmallInitialBuffer() {
+    public void validateSingleByteDelimiterSmallInitialBuffer() throws IOException {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000, 2);
@@ -128,7 +129,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteDelimiter() {
+    public void validateWithMultiByteDelimiter() throws IOException {
         String data = "foodaabardaabazzz";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -139,7 +140,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteDelimiterAtTheBeginning() {
+    public void validateWithMultiByteDelimiterAtTheBeginning() throws IOException {
         String data = "daafoodaabardaabazzz";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -150,7 +151,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteDelimiterSmallInitialBuffer() {
+    public void validateWithMultiByteDelimiterSmallInitialBuffer() throws IOException {
         String data = "foodaabarffdaabazz";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000, 1);
@@ -161,7 +162,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteCharsNoDelimiter() {
+    public void validateWithMultiByteCharsNoDelimiter() throws IOException {
         String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -172,7 +173,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() {
+    public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() throws IOException {
         String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 2);
@@ -183,7 +184,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithComplexDelimiter() {
+    public void validateWithComplexDelimiter() throws IOException {
         String data = "THIS IS MY TEXT<MYDELIMITER>THIS IS MY NEW TEXT<MYDELIMITER>THIS IS MY NEWEST TEXT";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
         StreamDemarcator scanner = new StreamDemarcator(is, "<MYDELIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
@@ -193,8 +194,8 @@ public class StreamDemarcatorTest {
         assertNull(scanner.nextToken());
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void validateMaxBufferSize() {
+    @Test(expected = IOException.class)
+    public void validateMaxBufferSize() throws IOException {
         String data = "THIS IS MY TEXT<MY DELIMITER>THIS IS MY NEW TEXT<MY DELIMITER>THIS IS MY NEWEST TEXT";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
         StreamDemarcator scanner = new StreamDemarcator(is, "<MY DELIMITER>".getBytes(StandardCharsets.UTF_8), 20);
@@ -202,7 +203,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() {
+    public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() throws IOException {
         ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
         StreamDemarcator scanner = new StreamDemarcator(is, null, 20);
         byte[] b = scanner.nextToken();
@@ -210,7 +211,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateScannerHandlesNegativeOneByteInputs() {
+    public void validateScannerHandlesNegativeOneByteInputs() throws IOException {
         ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
         StreamDemarcator scanner = new StreamDemarcator(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024);
         byte[] b = scanner.nextToken();
@@ -218,10 +219,59 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void verifyScannerHandlesNegativeOneByteDelimiter() {
+    public void verifyScannerHandlesNegativeOneByteDelimiter() throws IOException {
         ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
         StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 20, 1024);
         assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0, 0 });
         assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0 });
     }
+
+    @Test
+    public void testWithoutTrailingDelimiter() throws IOException {
+        final byte[] inputData = "Larger Message First\nSmall".getBytes(StandardCharsets.UTF_8);
+        ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+        StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000);
+
+        final byte[] first = scanner.nextToken();
+        final byte[] second = scanner.nextToken();
+        assertNotNull(first);
+        assertNotNull(second);
+
+        assertEquals("Larger Message First", new String(first, StandardCharsets.UTF_8));
+        assertEquals("Small", new String(second, StandardCharsets.UTF_8));
+    }
+
+    @Test
+    public void testOnBufferSplitNoTrailingDelimiter() throws IOException {
+        final byte[] inputData = "Yes\nNo".getBytes(StandardCharsets.UTF_8);
+        ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+        StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
+
+        final byte[] first = scanner.nextToken();
+        final byte[] second = scanner.nextToken();
+        assertNotNull(first);
+        assertNotNull(second);
+
+        assertArrayEquals(first, new byte[] {'Y', 'e', 's'});
+        assertArrayEquals(second, new byte[] {'N', 'o'});
+    }
+
+    @Test
+    public void testOnBufferSplit() throws IOException {
+        final byte[] inputData = "123\n456\n789".getBytes(StandardCharsets.UTF_8);
+        ByteArrayInputStream is = new ByteArrayInputStream(inputData);
+        StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
+
+        final byte[] first = scanner.nextToken();
+        final byte[] second = scanner.nextToken();
+        final byte[] third = scanner.nextToken();
+        assertNotNull(first);
+        assertNotNull(second);
+        assertNotNull(third);
+
+        assertArrayEquals(first, new byte[] {'1', '2', '3'});
+        assertArrayEquals(second, new byte[] {'4', '5', '6'});
+        assertArrayEquals(third, new byte[] {'7', '8', '9'});
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 69118db..e1de2b9 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -368,54 +368,55 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     @Override
-    public void enqueue(final Path path) throws IOException {
-        enqueue(path, new HashMap<String, String>());
+    public MockFlowFile enqueue(final Path path) throws IOException {
+        return enqueue(path, new HashMap<String, String>());
     }
 
     @Override
-    public void enqueue(final Path path, final Map<String, String> attributes) throws IOException {
+    public MockFlowFile enqueue(final Path path, final Map<String, String> attributes) throws IOException {
         final Map<String, String> modifiedAttributes = new HashMap<>(attributes);
         if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
             modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName());
         }
         try (final InputStream in = Files.newInputStream(path)) {
-            enqueue(in, modifiedAttributes);
+            return enqueue(in, modifiedAttributes);
         }
     }
 
     @Override
-    public void enqueue(final byte[] data) {
-        enqueue(data, new HashMap<String, String>());
+    public MockFlowFile enqueue(final byte[] data) {
+        return enqueue(data, new HashMap<String, String>());
     }
 
     @Override
-    public void enqueue(final String data) {
-        enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
+    public MockFlowFile enqueue(final String data) {
+        return enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
     }
 
     @Override
-    public void enqueue(final byte[] data, final Map<String, String> attributes) {
-        enqueue(new ByteArrayInputStream(data), attributes);
+    public MockFlowFile enqueue(final byte[] data, final Map<String, String> attributes) {
+        return enqueue(new ByteArrayInputStream(data), attributes);
     }
 
     @Override
-    public void enqueue(final String data, final Map<String, String> attributes) {
-        enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
+    public MockFlowFile enqueue(final String data, final Map<String, String> attributes) {
+        return enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
     }
 
 
     @Override
-    public void enqueue(final InputStream data) {
-        enqueue(data, new HashMap<String, String>());
+    public MockFlowFile enqueue(final InputStream data) {
+        return enqueue(data, new HashMap<String, String>());
     }
 
     @Override
-    public void enqueue(final InputStream data, final Map<String, String> attributes) {
+    public MockFlowFile enqueue(final InputStream data, final Map<String, String> attributes) {
         final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor);
         MockFlowFile flowFile = session.create();
         flowFile = session.importFrom(data, flowFile);
         flowFile = session.putAllAttributes(flowFile, attributes);
         enqueue(flowFile);
+        return flowFile;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 78d4d00..5d8b494 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -382,7 +382,7 @@ public interface TestRunner {
      * @param path to read content from
      * @throws IOException if unable to read content
      */
-    void enqueue(Path path) throws IOException;
+    MockFlowFile enqueue(Path path) throws IOException;
 
     /**
      * Reads the content from the given {@link Path} into memory and creates a
@@ -393,7 +393,7 @@ public interface TestRunner {
      * @param attributes attributes to use for new flow file
      * @throws IOException if unable to read content
      */
-    void enqueue(Path path, Map<String, String> attributes) throws IOException;
+    MockFlowFile enqueue(Path path, Map<String, String> attributes) throws IOException;
 
     /**
      * Copies the content from the given byte array into memory and creates a
@@ -402,7 +402,7 @@ public interface TestRunner {
      *
      * @param data to enqueue
      */
-    void enqueue(byte[] data);
+    MockFlowFile enqueue(byte[] data);
 
     /**
      * Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes,
@@ -410,7 +410,7 @@ public interface TestRunner {
      *
      * @param data to enqueue
      */
-    void enqueue(String data);
+    MockFlowFile enqueue(String data);
 
     /**
      * Copies the content from the given byte array into memory and creates a
@@ -420,7 +420,7 @@ public interface TestRunner {
      * @param data to enqueue
      * @param attributes to use for enqueued item
      */
-    void enqueue(byte[] data, Map<String, String> attributes);
+    MockFlowFile enqueue(byte[] data, Map<String, String> attributes);
 
     /**
      * Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes,
@@ -429,7 +429,7 @@ public interface TestRunner {
      * @param data to enqueue
      * @param attributes to use for enqueued item
      */
-    void enqueue(String data, Map<String, String> attributes);
+    MockFlowFile enqueue(String data, Map<String, String> attributes);
 
     /**
      * Reads the content from the given {@link InputStream} into memory and
@@ -438,7 +438,7 @@ public interface TestRunner {
      *
      * @param data to source data from
      */
-    void enqueue(InputStream data);
+    MockFlowFile enqueue(InputStream data);
 
     /**
      * Reads the content from the given {@link InputStream} into memory and
@@ -448,7 +448,7 @@ public interface TestRunner {
      * @param data source of data
      * @param attributes to use for flow files
      */
-    void enqueue(InputStream data, Map<String, String> attributes);
+    MockFlowFile enqueue(InputStream data, Map<String, String> attributes);
 
     /**
      * Copies the contents of the given {@link MockFlowFile} into a byte array

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 847f8a4..e859f94 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -21,17 +21,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +36,12 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -53,17 +49,18 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
-@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10 Consumer API. "
+@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API. "
         + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
-        + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the mean time"
-        + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
-@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10"})
+        + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the meantime"
+        + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi processor for sending messages is PublishKafka_0_10.")
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"})
 @WritesAttributes({
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
-        + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+            + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
     @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,22 +72,16 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
 public class ConsumeKafka_0_10 extends AbstractProcessor {
 
-    private static final long FIVE_MB = 5L * 1024L * 1024L;
-
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
-            .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma seperated.")
+            .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
             .required(true)
             .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
             .expressionLanguageSupported(true)
@@ -136,6 +127,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
                     + "will result in a single FlowFile which  "
                     + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
             .build();
+
     static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
             .name("max.poll.records")
             .displayName("Max Poll Records")
@@ -145,6 +137,20 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
+    static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+            .name("max-uncommit-offset-wait")
+            .displayName("Max Uncommitted Time")
+            .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+                    + "This value impacts how often offsets will be committed.  Committing offsets less often increases "
+                    + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+                    + "or JVM restart between commits.  This value is also related to maximum poll records and the use "
+                    + "of a message demarcator.  When using a message demarcator we can have far more uncommitted messages "
+                    + "than when we're not as there is much less for us to keep track of in memory.")
+            .required(false)
+            .defaultValue("1 secs")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("FlowFiles received from Kafka.  Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,8 +159,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
     static final List<PropertyDescriptor> DESCRIPTORS;
     static final Set<Relationship> RELATIONSHIPS;
 
-    private volatile byte[] demarcatorBytes = null;
     private volatile ConsumerPool consumerPool = null;
+    private final Set<ConsumerLease> activeLeases = Collections.synchronizedSet(new HashSet<>());
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -165,6 +171,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
         descriptors.add(MESSAGE_DEMARCATOR);
         descriptors.add(MAX_POLL_RECORDS);
+        descriptors.add(MAX_UNCOMMITTED_TIME);
         DESCRIPTORS = Collections.unmodifiableList(descriptors);
         RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
     }
@@ -179,16 +186,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         return DESCRIPTORS;
     }
 
-    @OnScheduled
-    public void prepareProcessing(final ProcessContext context) {
-        this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
-                : null;
-    }
-
     @OnStopped
     public void close() {
-        demarcatorBytes = null;
         final ConsumerPool pool = consumerPool;
         consumerPool = null;
         if (pool != null) {
@@ -215,9 +214,21 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
             return pool;
         }
 
-        final Map<String, String> props = new HashMap<>();
+        return consumerPool = createConsumerPool(context, getLogger());
+    }
+
+    protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+        final int maxLeases = context.getMaxConcurrentTasks();
+        final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
+                ? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+                : null;
+        final Map<String, Object> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
-        final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         for (final String topic : topicListing.split(",", 100)) {
             final String trimmedName = topic.trim();
@@ -225,212 +236,78 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
                 topics.add(trimmedName);
             }
         }
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
-    }
+        final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
 
-    protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-        return new ConsumerPool(maxLeases, topics, props, log);
+        return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }
 
-    @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final long startTimeNanos = System.nanoTime();
-        final ConsumerPool pool = getConsumerPool(context);
-        if (pool == null) {
-            context.yield();
-            return;
-        }
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
-        try (final ConsumerLease lease = pool.obtainConsumer()) {
+    @OnUnscheduled
+    public void interruptActiveThreads() {
+        // There are known issues with the Kafka client library that result in the client code hanging
+        // indefinitely when unable to communicate with the broker. In order to address this, we will wait
+        // up to 30 seconds for the Threads to finish and then will call Consumer.wakeup() to trigger the
+        // thread to wakeup when it is blocked, waiting on a response.
+        final long nanosToWait = TimeUnit.SECONDS.toNanos(5L);
+        final long start = System.nanoTime();
+        while (System.nanoTime() - start < nanosToWait && !activeLeases.isEmpty()) {
             try {
-                if (lease == null) {
-                    context.yield();
-                    return;
-                }
-
-                final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
-                if (!foundData) {
-                    session.rollback();
-                    return;
-                }
-
-                writeSessionData(context, session, partitionRecordMap, startTimeNanos);
-                //At-least once commit handling (if order is reversed it is at-most once)
-                session.commit();
-                commitOffsets(lease, partitionRecordMap);
-            } catch (final KafkaException ke) {
-                lease.poison();
-                getLogger().error("Problem while accessing kafka consumer " + ke, ke);
-                context.yield();
-                session.rollback();
+                Thread.sleep(100L);
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                return;
             }
         }
-    }
 
-    private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
-        final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
-        partitionRecordMap.entrySet().stream()
-                .filter(entry -> !entry.getValue().isEmpty())
-                .forEach((entry) -> {
-                    long maxOffset = entry.getValue().stream()
-                            .mapToLong(record -> record.offset())
-                            .max()
-                            .getAsLong();
-                    partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
-                });
-        lease.commitOffsets(partOffsetMap);
-    }
+        if (!activeLeases.isEmpty()) {
+            int count = 0;
+            for (final ConsumerLease lease : activeLeases) {
+                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                lease.wakeup();
+                count++;
+            }
 
-    private void writeSessionData(
-            final ProcessContext context, final ProcessSession session,
-            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
-            final long startTimeNanos) {
-        if (demarcatorBytes != null) {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .forEach(entry -> {
-                        writeData(context, session, entry.getValue(), startTimeNanos);
-                    });
-        } else {
-            partitionRecordMap.entrySet().stream()
-                    .filter(entry -> !entry.getValue().isEmpty())
-                    .flatMap(entry -> entry.getValue().stream())
-                    .forEach(record -> {
-                        writeData(context, session, Collections.singletonList(record), startTimeNanos);
-                    });
+            getLogger().info("Woke up {} consumers", new Object[] {count});
         }
-    }
 
-    private String encodeKafkaKey(final byte[] key, final String encoding) {
-        if (key == null) {
-            return null;
-        }
+        activeLeases.clear();
+    }
 
-        if (HEX_ENCODING.getValue().equals(encoding)) {
-            return DatatypeConverter.printHexBinary(key);
-        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
-            return new String(key, StandardCharsets.UTF_8);
-        } else {
-            return null;    // won't happen because it is guaranteed by the Allowable Values
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        final ConsumerPool pool = getConsumerPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
         }
-    }
 
-    private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
-        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
-        final String offset = String.valueOf(firstRecord.offset());
-        final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
-        final String topic = firstRecord.topic();
-        final String partition = String.valueOf(firstRecord.partition());
-        FlowFile flowFile = session.create();
-        flowFile = session.write(flowFile, out -> {
-            boolean useDemarcator = false;
-            for (final ConsumerRecord<byte[], byte[]> record : records) {
-                if (useDemarcator) {
-                    out.write(demarcatorBytes);
-                }
-                out.write(record.value());
-                useDemarcator = true;
+        try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+            if (lease == null) {
+                context.yield();
+                return;
             }
-        });
-        final Map<String, String> kafkaAttrs = new HashMap<>();
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
-        if (keyValue != null && records.size() == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
-        }
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
-        if (records.size() > 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
-        }
-        flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
-        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(
-                context.getProperty(SECURITY_PROTOCOL).getValue(),
-                context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
-                topic);
-        session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
-        this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
-                new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
-        session.transfer(flowFile, REL_SUCCESS);
-    }
 
-    /**
-     * Populates the given partitionRecordMap with new records until we poll
-     * that returns no records or until we have enough data. It is important to
-     * ensure we keep items grouped by their topic and partition so that when we
-     * bundle them we bundle them intelligently and so that we can set offsets
-     * properly even across multiple poll calls.
-     */
-    private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
-        final long startNanos = System.nanoTime();
-        boolean foundData = false;
-        ConsumerRecords<byte[], byte[]> records;
-        final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
-        do {
-            records = lease.poll();
-
-            for (final TopicPartition partition : records.partitions()) {
-                List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
-                if (currList == null) {
-                    currList = new ArrayList<>();
-                    partitionRecordMap.put(partition, currList);
+            activeLeases.add(lease);
+            try {
+                while (this.isScheduled() && lease.continuePolling()) {
+                    lease.poll();
                 }
-                currList.addAll(records.records(partition));
-                if (currList.size() > 0) {
-                    foundData = true;
+                if (this.isScheduled() && !lease.commit()) {
+                    context.yield();
                 }
+            } catch (final WakeupException we) {
+                getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
+                    + "Will roll back session and discard any partially received data.", new Object[] {lease});
+            } catch (final KafkaException kex) {
+                getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+                        new Object[]{lease, kex}, kex);
+            } catch (final Throwable t) {
+                getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+                        new Object[]{lease, t}, t);
+            } finally {
+                activeLeases.remove(lease);
             }
-            //If we received data and we still want to get more
-        } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
-        return foundData;
-    }
-
-    /**
-     * Determines if we have enough data as-is and should move on.
-     *
-     * @return true if we've been gathering for more than 500 ms or if we're
-     * demarcating and have more than 50 flowfiles worth or if we're per message
-     * and have more than 2000 flowfiles or if totalMessageSize is greater than
-     * two megabytes; false otherwise
-     *
-     * Implementation note: 500 millis and 5 MB are magic numbers. These may
-     * need to be tuned. They get at how often offsets will get committed to
-     * kafka relative to how many records will get buffered into memory in a
-     * poll call before writing to repos.
-     */
-    private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final int maxRecords, final long startTimeNanos) {
-
-        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
-        if (durationMillis > 500) {
-            return true;
-        }
-
-        int topicPartitionsFilled = 0;
-        int totalRecords = 0;
-        long totalRecordSize = 0;
-
-        for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
-            if (!recordList.isEmpty()) {
-                topicPartitionsFilled++;
-            }
-            totalRecords += recordList.size();
-            for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
-                totalRecordSize += rec.value().length;
-            }
-        }
-
-        if (demarcatorBytes != null && demarcatorBytes.length > 0) {
-            return topicPartitionsFilled > 50;
-        } else if (totalRecordSize > FIVE_MB) {
-            return true;
-        } else {
-            return totalRecords > maxRecords;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..97ebfc6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,28 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +47,108 @@ import org.apache.kafka.common.TopicPartition;
  * the lease will be returned to the pool for future use by others. A given
  * lease may only belong to a single thread a time.
  */
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+    private final long maxWaitMillis;
+    private final Consumer<byte[], byte[]> kafkaConsumer;
+    private final ComponentLog logger;
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
+    private boolean poisoned = false;
+    //used for tracking demarcated flowfiles to their TopicPartition so we can append
+    //to them on subsequent poll calls
+    private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+    private long leaseStartNanos = -1;
+    private boolean lastPollEmpty = false;
+    private int totalFlowFiles = 0;
+
+    ConsumerLease(
+            final long maxWaitMillis,
+            final Consumer<byte[], byte[]> kafkaConsumer,
+            final byte[] demarcatorBytes,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.maxWaitMillis = maxWaitMillis;
+        this.kafkaConsumer = kafkaConsumer;
+        this.demarcatorBytes = demarcatorBytes;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.logger = logger;
+    }
+
+    /**
+     * clears out internal state elements excluding session and consumer as
+     * those are managed by the pool itself
+     */
+    private void resetInternalState() {
+        bundleMap.clear();
+        uncommittedOffsetsMap.clear();
+        leaseStartNanos = -1;
+        lastPollEmpty = false;
+        totalFlowFiles = 0;
+    }
 
     /**
-     * Executes a poll on the underlying Kafka Consumer.
+     * Kafka will call this method whenever it is about to rebalance the
+     * consumers for the given partitions. We'll simply take this to mean that
+     * we need to quickly commit what we've got and will return the consumer to
+     * the pool. This method will be called during the poll() method call of
+     * this class and will be called by the same thread calling poll according
+     * to the Kafka API docs. After this method executes the session and kafka
+     * offsets are committed and this lease is closed.
      *
-     * @return ConsumerRecords retrieved in the poll.
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * @param partitions partitions being reassigned
+     */
+    @Override
+    public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
+        commit();
+    }
+
+    /**
+     * This will be called by Kafka when the rebalance has completed. We don't
+     * need to do anything with this information other than optionally log it as
+     * by this point we've committed what we've got and moved on.
+     *
+     * @param partitions topic partition set being reassigned
+     */
+    @Override
+    public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+        logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+    }
+
+    /**
+     * Executes a poll on the underlying Kafka Consumer and creates any new
+     * flowfiles necessary or appends to existing ones if in demarcation mode.
      */
-    ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+    void poll() {
+        /**
+         * Implementation note: If we take too long (30 secs?) between kafka
+         * poll calls and our own record processing to any subsequent poll calls
+         * or the commit we can run into a situation where the commit will
+         * succeed to the session but fail on committing offsets. This is
+         * apparently different than the Kafka scenario of electing to rebalance
+         * for other reasons but in this case is due a session timeout. It
+         * appears Kafka KIP-62 aims to offer more control over the meaning of
+         * various timeouts. If we do run into this case it could result in
+         * duplicates.
+         */
+        try {
+            final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+            lastPollEmpty = records.count() == 0;
+            processRecords(records);
+        } catch (final Throwable t) {
+            this.poison();
+            throw t;
+        }
+    }
 
     /**
      * Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +157,251 @@ public interface ConsumerLease extends Closeable {
      * kafka client to collect more data from Kafka before committing the
      * offsets.
      *
-     * @param offsets offsets
-     * @throws KafkaException if issue occurs talking to underlying resource.
+     * if false then we didn't do anything and should probably yield if true
+     * then we committed new data
+     *
      */
-    void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+    boolean commit() {
+        if (uncommittedOffsetsMap.isEmpty()) {
+            resetInternalState();
+            return false;
+        }
+        try {
+            /**
+             * Committing the nifi session then the offsets means we have an at
+             * least once guarantee here. If we reversed the order we'd have at
+             * most once.
+             */
+            final Collection<FlowFile> bundledFlowFiles = getBundles();
+            if (!bundledFlowFiles.isEmpty()) {
+                getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+            }
+            getProcessSession().commit();
+            kafkaConsumer.commitSync(uncommittedOffsetsMap);
+            resetInternalState();
+            return true;
+        } catch (final KafkaException kex) {
+            poison();
+            logger.warn("Duplicates are likely as we were able to commit the process"
+                    + " session but received an exception from Kafka while committing"
+                    + " offsets.");
+            throw kex;
+        } catch (final Throwable t) {
+            poison();
+            throw t;
+        }
+    }
 
     /**
-     * Notifies that this lease is poisoned and should not be reused.
+     * Indicates whether we should continue polling for data. If we are not
+     * writing data with a demarcator then we're writing individual flow files
+     * per kafka message therefore we must be very mindful of memory usage for
+     * the flow file objects (not their content) being held in memory. The
+     * content of kafka messages will be written to the content repository
+     * immediately upon each poll call but we must still be mindful of how much
+     * memory can be used in each poll call. We will indicate that we should
+     * stop polling our last poll call produced no new results or if we've
+     * polling and processing data longer than the specified maximum polling
+     * time or if we have reached out specified max flow file limit or if a
+     * rebalance has been initiated for one of the partitions we're watching;
+     * otherwise true.
+     *
+     * @return true if should keep polling; false otherwise
      */
-    void poison();
+    boolean continuePolling() {
+        //stop if the last poll produced new no data
+        if (lastPollEmpty) {
+            return false;
+        }
+
+        //stop if we've gone past our desired max uncommitted wait time
+        if (leaseStartNanos < 0) {
+            leaseStartNanos = System.nanoTime();
+        }
+        final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        if (durationMillis > maxWaitMillis) {
+            return false;
+        }
+
+        //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+        if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+            return false;
+        } else {
+            return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+        }
+    }
 
     /**
-     * Notifies that this lease is to be returned. The pool may optionally reuse
-     * this lease with another client. No further references by the caller
-     * should occur after calling close.
+     * Indicates that the underlying session and consumer should be immediately
+     * considered invalid. Once closed the session will be rolled back and the
+     * pool should destroy the underlying consumer. This is useful if due to
+     * external reasons, such as the processor no longer being scheduled, this
+     * lease should be terminated immediately.
+     */
+    private void poison() {
+        poisoned = true;
+    }
+
+    /**
+     * @return true if this lease has been poisoned; false otherwise
+     */
+    boolean isPoisoned() {
+        return poisoned;
+    }
+
+    /**
+     * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
+     */
+    public void wakeup() {
+        kafkaConsumer.wakeup();
+    }
+
+    /**
+     * Abstract method that is intended to be extended by the pool that created
+     * this ConsumerLease object. It should ensure that the session given to
+     * create this session is rolled back and that the underlying kafka consumer
+     * is either returned to the pool for continued use or destroyed if this
+     * lease has been poisoned. It can only be called once. Calling it more than
+     * once can result in undefined and non threadsafe behavior.
      */
     @Override
-    void close();
+    public void close() {
+        resetInternalState();
+    }
+
+    public abstract ProcessSession getProcessSession();
+
+    private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+        records.partitions().stream().forEach(partition -> {
+            List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+            if (!messages.isEmpty()) {
+                //update maximum offset map for this topic partition
+                long maxOffset = messages.stream()
+                        .mapToLong(record -> record.offset())
+                        .max()
+                        .getAsLong();
+                uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+                //write records to content repository and session
+                if (demarcatorBytes == null) {
+                    totalFlowFiles += messages.size();
+                    messages.stream().forEach(message -> {
+                        writeData(getProcessSession(), message, partition);
+                    });
+                } else {
+                    writeData(getProcessSession(), messages, partition);
+                }
+            }
+        });
+    }
+
+    private static String encodeKafkaKey(final byte[] key, final String encoding) {
+        if (key == null) {
+            return null;
+        }
+
+        if (HEX_ENCODING.getValue().equals(encoding)) {
+            return DatatypeConverter.printHexBinary(key);
+        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+            return new String(key, StandardCharsets.UTF_8);
+        } else {
+            return null;  // won't happen because it is guaranteed by the Allowable Values
+        }
+    }
+
+    private Collection<FlowFile> getBundles() {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        for (final BundleTracker tracker : bundleMap.values()) {
+            populateAttributes(tracker);
+            flowFiles.add(tracker.flowFile);
+        }
+        return flowFiles;
+    }
+
+    private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+        FlowFile flowFile = session.create();
+        final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+        tracker.incrementRecordCount(1);
+        flowFile = session.write(flowFile, out -> {
+            out.write(record.value());
+        });
+        tracker.updateFlowFile(flowFile);
+        populateAttributes(tracker);
+        session.transfer(tracker.flowFile, REL_SUCCESS);
+    }
+
+    private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+        final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+        final boolean demarcateFirstRecord;
+        BundleTracker tracker = bundleMap.get(topicPartition);
+        FlowFile flowFile;
+        if (tracker == null) {
+            tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+            flowFile = session.create();
+            tracker.updateFlowFile(flowFile);
+            demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+        } else {
+            demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+        }
+        flowFile = tracker.flowFile;
+        tracker.incrementRecordCount(records.size());
+        flowFile = session.append(flowFile, out -> {
+            boolean useDemarcator = demarcateFirstRecord;
+            for (final ConsumerRecord<byte[], byte[]> record : records) {
+                if (useDemarcator) {
+                    out.write(demarcatorBytes);
+                }
+                out.write(record.value());
+                useDemarcator = true;
+            }
+        });
+        tracker.updateFlowFile(flowFile);
+        bundleMap.put(topicPartition, tracker);
+    }
+
+    private void populateAttributes(final BundleTracker tracker) {
+        final Map<String, String> kafkaAttrs = new HashMap<>();
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        if (tracker.key != null && tracker.totalRecords == 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+        }
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        if (tracker.totalRecords > 1) {
+            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+        }
+        final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+        final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+        tracker.updateFlowFile(newFlowFile);
+    }
+
+    private static class BundleTracker {
+
+        final long initialOffset;
+        final int partition;
+        final String topic;
+        final String key;
+        FlowFile flowFile;
+        long totalRecords = 0;
+
+        private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+            this.initialOffset = initialRecord.offset();
+            this.partition = topicPartition.partition();
+            this.topic = topicPartition.topic();
+            this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+        }
+
+        private void incrementRecordCount(final long count) {
+            totalRecords += count;
+        }
+
+        private void updateFlowFile(final FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 3f20b8f..baacdc7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.nifi.logging.ComponentLog;
 
 import java.io.Closeable;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessSession;
 
 /**
  * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
  */
 public class ConsumerPool implements Closeable {
 
-    private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
-    private final int maxLeases;
-    private final Queue<ConsumerLease> consumerLeases;
+    private final BlockingQueue<SimpleConsumerLease> pooledLeases;
     private final List<String> topics;
     private final Map<String, Object> kafkaProperties;
+    private final long maxWaitMillis;
     private final ComponentLog logger;
-
+    private final byte[] demarcatorBytes;
+    private final String keyEncoding;
+    private final String securityProtocol;
+    private final String bootstrapServers;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
-    private final AtomicLong productivePollCountRef = new AtomicLong();
-    private final AtomicLong unproductivePollCountRef = new AtomicLong();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
-     * indicated leases. Consumers are lazily initialized.
+     * indicated threads from the given context. Consumers are lazily
+     * initialized. We may elect to not create up to the maximum number of
+     * configured consumers if the broker reported lag time for all topics is
+     * below a certain threshold.
      *
-     * @param maxLeases maximum number of active leases in the pool
-     * @param topics the topics to consume from
-     * @param kafkaProperties the properties for each consumer
+     * @param maxConcurrentLeases max allowable consumers at once
+     * @param demarcator bytes to use as demarcator between messages; null or
+     * empty means no demarcator
+     * @param kafkaProperties properties to use to initialize kafka consumers
+     * @param topics the topics to subscribe to
+     * @param maxWaitMillis maximum time to wait for a given lease to acquire
+     * data before committing
+     * @param keyEncoding the encoding to use for the key of a kafka message if
+     * found
+     * @param securityProtocol the security protocol used
+     * @param bootstrapServers the bootstrap servers
      * @param logger the logger to report any errors/warnings
      */
-    public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
-        this.maxLeases = maxLeases;
-        if (maxLeases <= 0) {
-            throw new IllegalArgumentException("Max leases value must be greather than zero.");
-        }
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+        final Map<String, Object> kafkaProperties,
+            final List<String> topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
-        if (topics == null || topics.isEmpty()) {
-            throw new IllegalArgumentException("Must have a list of one or more topics");
-        }
-        this.topics = topics;
-        this.kafkaProperties = new HashMap<>(kafkaProperties);
-        this.consumerLeases = new ArrayDeque<>();
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = Collections.unmodifiableList(topics);
     }
 
     /**
-     * Obtains a consumer from the pool if one is available
+     * Obtains a consumer from the pool if one is available or lazily
+     * initializes a new one if deemed necessary.
      *
-     * @return consumer from the pool
-     * @throws IllegalArgumentException if pool already contains
+     * @param session the session for which the consumer lease will be
+     * associated
+     * @return consumer to use or null if not available or necessary
      */
-    public ConsumerLease obtainConsumer() {
-        final ConsumerLease lease;
-        final int activeLeases;
-        synchronized (this) {
-            lease = consumerLeases.poll();
-            activeLeases = activeLeaseCount.get();
-        }
-        if (lease == null && activeLeases >= maxLeases) {
-            logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
-            return null;
+    public ConsumerLease obtainConsumer(final ProcessSession session) {
+        SimpleConsumerLease lease = pooledLeases.poll();
+        if (lease == null) {
+            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+            consumerCreatedCountRef.incrementAndGet();
+            /**
+             * For now return a new consumer lease. But we could later elect to
+             * have this return null if we determine the broker indicates that
+             * the lag time on all topics being monitored is sufficiently low.
+             * For now we should encourage conservative use of threads because
+             * having too many means we'll have at best useless threads sitting
+             * around doing frequent network calls and at worst having consumers
+             * sitting idle which could prompt excessive rebalances.
+             */
+            lease = new SimpleConsumerLease(consumer);
+            /**
+             * This subscription tightly couples the lease to the given
+             * consumer. They cannot be separated from then on.
+             */
+            consumer.subscribe(topics, lease);
         }
+        lease.setProcessSession(session);
         leasesObtainedCountRef.incrementAndGet();
-        return (lease == null) ? createConsumer() : lease;
+        return lease;
     }
 
+    /**
+     * Exposed as protected method for easier unit testing
+     *
+     * @return consumer
+     * @throws KafkaException if unable to subscribe to the given topics
+     */
     protected Consumer<byte[], byte[]> createKafkaConsumer() {
         return new KafkaConsumer<>(kafkaProperties);
     }
 
-    private ConsumerLease createConsumer() {
-        final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
-        consumerCreatedCountRef.incrementAndGet();
-        try {
-            kafkaConsumer.subscribe(topics);
-        } catch (final KafkaException kex) {
-            try {
-                kafkaConsumer.close();
-                consumerClosedCountRef.incrementAndGet();
-            } catch (final Exception ex) {
-                consumerClosedCountRef.incrementAndGet();
-                //ignore
-            }
-            throw kex;
-        }
-
-        final ConsumerLease lease = new ConsumerLease() {
-
-            private volatile boolean poisoned = false;
-            private volatile boolean closed = false;
-
-            @Override
-            public ConsumerRecords<byte[], byte[]> poll() {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and should no longer be used");
-                }
-
-                try {
-                    final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
-                    if (records.isEmpty()) {
-                        unproductivePollCountRef.incrementAndGet();
-                    } else {
-                        productivePollCountRef.incrementAndGet();
-                    }
-                    return records;
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-
-                if (poisoned) {
-                    throw new KafkaException("The consumer is poisoned and should no longer be used");
-                }
-                try {
-                    kafkaConsumer.commitSync(offsets);
-                } catch (final KafkaException kex) {
-                    logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
-                    poison();
-                    close();
-                    throw kex;
-                }
-            }
-
-            @Override
-            public void close() {
-                if (closed) {
-                    return;
-                }
-                if (poisoned || activeLeaseCount.get() > maxLeases) {
-                    closeConsumer(kafkaConsumer);
-                    activeLeaseCount.decrementAndGet();
-                    closed = true;
-                } else {
-                    final boolean added;
-                    synchronized (ConsumerPool.this) {
-                        added = consumerLeases.offer(this);
-                    }
-                    if (!added) {
-                        closeConsumer(kafkaConsumer);
-                        activeLeaseCount.decrementAndGet();
-                    }
-                }
-            }
-
-            @Override
-            public void poison() {
-                poisoned = true;
-            }
-        };
-        activeLeaseCount.incrementAndGet();
-        return lease;
-    }
-
     /**
-     * Closes all consumers in the pool. Can be safely recalled.
+     * Closes all consumers in the pool. Can be safely called repeatedly.
      */
     @Override
     public void close() {
-        final List<ConsumerLease> leases = new ArrayList<>();
-        synchronized (this) {
-            ConsumerLease lease = null;
-            while ((lease = consumerLeases.poll()) != null) {
-                leases.add(lease);
-            }
-        }
-        for (final ConsumerLease lease : leases) {
-            lease.poison();
-            lease.close();
-        }
+        final List<SimpleConsumerLease> leases = new ArrayList<>();
+        pooledLeases.drainTo(leases);
+        leases.stream().forEach((lease) -> {
+            lease.close(true);
+        });
     }
 
-    private void closeConsumer(final Consumer consumer) {
+    private void closeConsumer(final Consumer<?, ?> consumer) {
+        consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();
         } catch (Exception e) {
@@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
 
         try {
             consumer.close();
-            consumerClosedCountRef.incrementAndGet();
         } catch (Exception e) {
-            consumerClosedCountRef.incrementAndGet();
             logger.warn("Failed while closing " + consumer, e);
         }
     }
 
     PoolStats getPoolStats() {
-        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
+        return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+    }
+
+    private class SimpleConsumerLease extends ConsumerLease {
+
+        private final Consumer<byte[], byte[]> consumer;
+        private volatile ProcessSession session;
+        private volatile boolean closedConsumer;
+
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+            super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
+            this.consumer = consumer;
+        }
+
+        void setProcessSession(final ProcessSession session) {
+            this.session = session;
+        }
+
+        @Override
+        public ProcessSession getProcessSession() {
+            return session;
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            close(false);
+        }
+
+        public void close(final boolean forceClose) {
+            if (closedConsumer) {
+                return;
+            }
+            super.close();
+            if (session != null) {
+                session.rollback();
+                setProcessSession(null);
+            }
+            if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+                closedConsumer = true;
+                closeConsumer(consumer);
+            }
+        }
     }
 
     static final class PoolStats {
@@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
         final long consumerCreatedCount;
         final long consumerClosedCount;
         final long leasesObtainedCount;
-        final long productivePollCount;
-        final long unproductivePollCount;
 
         PoolStats(
                 final long consumerCreatedCount,
                 final long consumerClosedCount,
-                final long leasesObtainedCount,
-                final long productivePollCount,
-                final long unproductivePollCount
+                final long leasesObtainedCount
         ) {
             this.consumerCreatedCount = consumerCreatedCount;
             this.consumerClosedCount = consumerClosedCount;
             this.leasesObtainedCount = leasesObtainedCount;
-            this.productivePollCount = productivePollCount;
-            this.unproductivePollCount = unproductivePollCount;
         }
 
         @Override
         public String toString() {
             return "Created Consumers [" + consumerCreatedCount + "]\n"
                     + "Closed Consumers  [" + consumerClosedCount + "]\n"
-                    + "Leases Obtained   [" + leasesObtainedCount + "]\n"
-                    + "Productive Polls  [" + productivePollCount + "]\n"
-                    + "Unproductive Polls  [" + unproductivePollCount + "]\n";
+                    + "Leases Obtained   [" + leasesObtainedCount + "]\n";
         }
 
     }