You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/15 18:51:46 UTC

[07/20] nifi git commit: NIFI-3198: Refactored how PublishKafka and PublishKafka_0_10 work to improve throughput and resilience. Fixed bug in StreamDemarcator. Slight refactoring of consume processors to simplify code.

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7e4b12c..8e3fa3b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,105 +16,36 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
 public class ConsumeKafkaTest {
 
-    static class MockConsumerPool extends ConsumerPool {
-
-        final int actualMaxLeases;
-        final List<String> actualTopics;
-        final Map<String, String> actualKafkaProperties;
-        boolean throwKafkaExceptionOnPoll = false;
-        boolean throwKafkaExceptionOnCommit = false;
-        Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
-        Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
-        Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
-        boolean wasConsumerLeasePoisoned = false;
-        boolean wasConsumerLeaseClosed = false;
-        boolean wasPoolClosed = false;
-
-        public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
-            super(maxLeases, topics, kafkaProperties, null);
-            actualMaxLeases = maxLeases;
-            actualTopics = topics;
-            actualKafkaProperties = kafkaProperties;
-        }
-
-        @Override
-        public ConsumerLease obtainConsumer() {
-            return new ConsumerLease() {
-                @Override
-                public ConsumerRecords<byte[], byte[]> poll() {
-                    if (throwKafkaExceptionOnPoll) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
-                    return (records == null) ? ConsumerRecords.empty() : records;
-                }
-
-                @Override
-                public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
-                    if (throwKafkaExceptionOnCommit) {
-                        throw new KafkaException("i planned to fail");
-                    }
-                    actualCommitOffsets = offsets;
-                }
-
-                @Override
-                public void poison() {
-                    wasConsumerLeasePoisoned = true;
-                }
-
-                @Override
-                public void close() {
-                    wasConsumerLeaseClosed = true;
-                }
-            };
-        }
-
-        @Override
-        public void close() {
-            wasPoolClosed = true;
-        }
-
-        void resetState() {
-            throwKafkaExceptionOnPoll = false;
-            throwKafkaExceptionOnCommit = false;
-            nextPlannedRecordsQueue = null;
-            nextExpectedCommitOffsets = null;
-            wasConsumerLeasePoisoned = false;
-            wasConsumerLeaseClosed = false;
-            wasPoolClosed = false;
-        }
+    Consumer<byte[], byte[]> mockConsumer = null;
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
 
+    @Before
+    public void setup() {
+        mockConsumer = mock(Consumer.class);
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
     }
 
     @Test
@@ -175,365 +106,45 @@ public class ConsumeKafkaTest {
     public void validateGetAllMessages() throws Exception {
         String groupName = "validateGetAllMessages";
 
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
-            assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-        } else {
-            assertEquals(2, mockPool.actualCommitOffsets.size());
-            assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        }
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-    }
-
-    @Test
-    public void validateGetLotsOfMessages() throws Exception {
-        String groupName = "validateGetLotsOfMessages";
-
-        final byte[][] firstPassValues = new byte[10010][1];
-        for (final byte[] value : firstPassValues) {
-            value[0] = 0x12;
-        }
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-        mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
-        assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
-        assertEquals(1, mockPool.actualCommitOffsets.size());
-        assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-        for (final byte[] rawRecord : rawRecords) {
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        final TopicPartition tPart = new TopicPartition(topic, partition);
-        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
-        long offset = startingOffset;
-
-        for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
-            final byte[] key = entry.getKey();
-            final byte[] rawRecord = entry.getValue();
-            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
-            records.add(rec);
-        }
-        map.put(tPart, records);
-        return new ConsumerRecords(map);
-    }
-
-    private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
-        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
-        for (final ConsumerRecords<byte[], byte[]> rec : records) {
-            rec.partitions().stream().forEach((part) -> {
-                final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
-                if (map.get(part) != null) {
-                    throw new IllegalStateException("already have that topic/partition in the record map");
-                }
-                map.put(part, conRecs);
-            });
-        }
-        return new ConsumerRecords<>(map);
-    }
-
-    @Test
-    public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
-        String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final byte[][] secondPassValues = new byte[][]{
-            "Hello-4".getBytes(StandardCharsets.UTF_8),
-            "Hello-5".getBytes(StandardCharsets.UTF_8),
-            "Hello-6".getBytes(StandardCharsets.UTF_8)
-        };
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues),
-                createConsumerRecords("bar", 1, 1L, secondPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
         ConsumeKafka proc = new ConsumeKafka() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
-
         final TestRunner runner = TestRunners.newTestRunner(proc);
         runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(2, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertEquals(2, mockPool.actualCommitOffsets.size());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-        assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
-    }
-
-    @Test
-    public void validatePollException() throws Exception {
-        String groupName = "validatePollException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnPoll = true;
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(0, flowFiles.size());
-        assertNull(null, mockPool.actualCommitOffsets);
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
-
-    @Test
-    public void validateCommitOffsetException() throws Exception {
-        String groupName = "validateCommitOffsetException";
-
-        final byte[][] firstPassValues = new byte[][]{
-            "Hello-1".getBytes(StandardCharsets.UTF_8),
-            "Hello-2".getBytes(StandardCharsets.UTF_8),
-            "Hello-3".getBytes(StandardCharsets.UTF_8)
-        };
-
-        final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
-                createConsumerRecords("foo", 1, 1L, firstPassValues)
-        );
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
-        mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-        mockPool.throwKafkaExceptionOnCommit = true;
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
-        runner.run(1, true);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(1, flowFiles.size());
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertTrue(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-
-        assertNull(null, mockPool.actualCommitOffsets);
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
     @Test
-    public void validateUtf8Key() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
 
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
         ConsumeKafka proc = new ConsumeKafka() {
             @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -542,89 +153,15 @@ public class ConsumeKafkaTest {
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
         runner.run(1, false);
 
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
     }
 
-    @Test
-    public void validateHexKey() {
-        String groupName = "validateGetAllMessages";
-
-        final Map<byte[], byte[]> rawRecords = new HashMap<>();
-        rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
-        rawRecords.put(new byte[0], "Hello-2".getBytes());
-        rawRecords.put(null, "Hello-3".getBytes());
-
-        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
-        final List<String> expectedTopics = new ArrayList<>();
-        expectedTopics.add("foo");
-        expectedTopics.add("bar");
-        final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
-        mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
-                return mockPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(expectedTopics, mockPool.actualTopics);
-
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
-        assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
-        final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
-        assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
-        //asert that all consumers were closed as expected
-        //assert that the consumer pool was properly closed
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertFalse(mockPool.wasPoolClosed);
-        runner.run(1, true);
-        assertFalse(mockPool.wasConsumerLeasePoisoned);
-        assertTrue(mockPool.wasConsumerLeaseClosed);
-        assertTrue(mockPool.wasPoolClosed);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
  */
 package org.apache.nifi.processors.kafka.pubsub;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerPoolTest {
 
     Consumer<byte[], byte[]> consumer = null;
+    ProcessSession mockSession = null;
+    ProvenanceReporter mockReporter = null;
+    ConsumerPool testPool = null;
+    ConsumerPool testDemarcatedPool = null;
     ComponentLog logger = null;
 
     @Before
     public void setup() {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
-    }
-
-    @Test
-    public void validatePoolSimpleCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
             }
         };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
 
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
             lease.poll();
-            lease.commitOffsets(Collections.emptyMap());
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
-        assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(1, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
+        assertEquals(4, stats.leasesObtainedCount);
     }
 
     @Test
-    public void validatePoolSimpleBatchCreateClose() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
 
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
-            try (final ConsumerLease lease = testPool.obtainConsumer()) {
+            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
                 for (int j = 0; j < 100; j++) {
                     lease.poll();
                 }
-                lease.commitOffsets(Collections.emptyMap());
             }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(100, stats.leasesObtainedCount);
-        assertEquals(10000, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
 
     @Test
-    public void validatePoolConsumerFails() throws Exception {
-
-        final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
-            @Override
-            protected Consumer<byte[], byte[]> createKafkaConsumer() {
-                return consumer;
-            }
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
         };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
 
-        when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
-        try (final ConsumerLease lease = testPool.obtainConsumer()) {
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
             lease.poll();
-            fail();
-        } catch (final KafkaException ke) {
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
 
+            }
         }
         testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
         final PoolStats stats = testPool.getPoolStats();
         assertEquals(1, stats.consumerCreatedCount);
         assertEquals(1, stats.consumerClosedCount);
         assertEquals(1, stats.leasesObtainedCount);
-        assertEquals(0, stats.unproductivePollCount);
-        assertEquals(0, stats.productivePollCount);
     }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
deleted file mode 100644
index 19c64af..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.kafka.clients.producer.ProducerConfig;
-
-public class KafkaPublisherTest {
-
-    private static EmbeddedKafka kafkaLocal;
-
-    private static EmbeddedKafkaProducerHelper producerHelper;
-
-    @BeforeClass
-    public static void beforeClass() {
-        kafkaLocal = new EmbeddedKafka();
-        kafkaLocal.start();
-        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        producerHelper.close();
-        kafkaLocal.stop();
-    }
-
-    @Test
-    public void validateSuccessfulSendAsWhole() throws Exception {
-        InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateSuccessfulSendAsWhole";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-        KafkaPublisherResult result = publisher.publish(publishingContext);
-
-        assertEquals(0, result.getLastMessageAcked());
-        assertEquals(1, result.getMessagesSent());
-        contentStream.close();
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        assertNotNull(iter.next());
-        try {
-            iter.next();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-    }
-
-    @Test
-    public void validateSuccessfulSendAsDelimited() throws Exception {
-        InputStream contentStream = new ByteArrayInputStream(
-                "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateSuccessfulSendAsDelimited";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        KafkaPublisherResult result = publisher.publish(publishingContext);
-
-        assertEquals(3, result.getLastMessageAcked());
-        assertEquals(4, result.getMessagesSent());
-        contentStream.close();
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-    }
-
-    /*
-     * This test simulates the condition where not all messages were ACKed by
-     * Kafka
-     */
-    @Test
-    public void validateRetries() throws Exception {
-        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
-        InputStream contentStream = new ByteArrayInputStream(testValue);
-        String topicName = "validateSuccessfulReSendOfFailedSegments";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        // simulates the first re-try
-        int lastAckedMessageIndex = 1;
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
-        publisher.publish(publishingContext);
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String m1 = new String(iter.next().message());
-        String m2 = new String(iter.next().message());
-        assertEquals("Hello Kafka3", m1);
-        assertEquals("Hello Kafka4", m2);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        // simulates the second re-try
-        lastAckedMessageIndex = 2;
-        contentStream = new ByteArrayInputStream(testValue);
-        publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(publishingContext);
-
-        m1 = new String(iter.next().message());
-        assertEquals("Hello Kafka4", m1);
-
-        publisher.close();
-    }
-
-    /*
-     * Similar to the above test, but it sets the first retry index to the last
-     * possible message index and second index to an out of bound index. The
-     * expectation is that no messages will be sent to Kafka
-     */
-    @Test
-    public void validateRetriesWithWrongIndex() throws Exception {
-        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
-        InputStream contentStream = new ByteArrayInputStream(testValue);
-        String topicName = "validateRetriesWithWrongIndex";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        // simulates the first re-try
-        int lastAckedMessageIndex = 3;
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
-        publisher.publish(publishingContext);
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        // simulates the second re-try
-        lastAckedMessageIndex = 6;
-        contentStream = new ByteArrayInputStream(testValue);
-        publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(publishingContext);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        publisher.close();
-    }
-
-    @Test
-    public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
-        String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
-        InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateWithMultiByteCharacters";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-
-        publisher.publish(publishingContext);
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String r = new String(iter.next().message(), StandardCharsets.UTF_8);
-        assertEquals(data, r);
-    }
-
-    @Test
-    public void validateWithNonDefaultPartitioner() throws Exception {
-        String data = "fooandbarandbaz";
-        InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateWithNonDefaultPartitioner";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName());
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-        publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
-
-        try {
-            publisher.publish(publishingContext);
-            // partitioner should be invoked 3 times
-            assertTrue(TestPartitioner.counter == 3);
-            publisher.close();
-        } finally {
-            TestPartitioner.counter = 0;
-        }
-    }
-
-    private Properties buildProducerProperties() {
-        Properties kafkaProperties = new Properties();
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaLocal.getKafkaPort());
-        kafkaProperties.put("auto.create.topics.enable", "true");
-        return kafkaProperties;
-    }
-
-    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
-        Properties props = new Properties();
-        props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort());
-        props.put("group.id", "test");
-        props.put("consumer.timeout.ms", "500");
-        props.put("auto.offset.reset", "smallest");
-        ConsumerConfig consumerConfig = new ConsumerConfig(props);
-        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
-        Map<String, Integer> topicCountMap = new HashMap<>(1);
-        topicCountMap.put(topic, 1);
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
-        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
-        return iter;
-    }
-
-    public static class TestPartitioner implements Partitioner {
-
-        static int counter;
-
-        @Override
-        public void configure(Map<String, ?> configs) {
-            // nothing to do, test
-        }
-
-        @Override
-        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
-                Cluster cluster) {
-            counter++;
-            return 0;
-        }
-
-        @Override
-        public void close() {
-            counter = 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
deleted file mode 100644
index d81f0c1..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.times;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.verify;
-
-public class PublishKafkaTest {
-
-    @Test
-    public void validateCustomSerilaizerDeserializerSettings() throws Exception {
-        PublishKafka publishKafka = new PublishKafka();
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(PublishKafka.TOPIC, "foo");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "3 sec");
-        runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        runner.assertValid();
-        runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo");
-        runner.assertNotValid();
-    }
-
-    @Test
-    public void validatePropertiesValidation() throws Exception {
-        PublishKafka publishKafka = new PublishKafka();
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(PublishKafka.TOPIC, "foo");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "foo");
-
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid"));
-        }
-    }
-
-    @Test
-    public void validateCustomValidation() {
-        String topicName = "validateCustomValidation";
-        PublishKafka publishKafka = new PublishKafka();
-
-        /*
-         * Validates that Kerberos principle is required if one of SASL set for
-         * secirity protocol
-         */
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
-        try {
-            runner.run();
-            fail();
-        } catch (Throwable e) {
-            assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because"));
-        }
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateSingleCharacterDemarcatedMessages() {
-        String topicName = "validateSingleCharacterDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-
-        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
-        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
-
-        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
-        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka(1);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
-
-        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");
-
-        final String text = "Hello World\nGoodbye\nfail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-        putKafka.destroy();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(1);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
-        final String text = "Hello World\nGoodbye\nfail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
-        final String text = "futurefail\nHello World\nGoodbye\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
-        assertNotNull(ff);
-        runner.enqueue(ff);
-
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        // 6 sends due to duplication
-        verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
-        final String text = "Hello World\nGoodbye\nfuturefail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
-        assertNotNull(ff);
-        runner.enqueue(ff);
-
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        // 6 sends due to duplication
-        verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateDemarcationIntoEmptyMessages() {
-        String topicName = "validateDemarcationIntoEmptyMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        final TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-
-        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
-        runner.enqueue(bytes);
-        runner.run(1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexRightPartialDemarcatedMessages() {
-        String topicName = "validateComplexRightPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
-
-        runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexLeftPartialDemarcatedMessages() {
-        String topicName = "validateComplexLeftPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
-
-        runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexPartialMatchDemarcatedMessages() {
-        String topicName = "validateComplexPartialMatchDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
-
-        runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDBOOMSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @Test
-    public void validateUtf8Key() {
-        String topicName = "validateUtf8Key";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.KEY, "${myKey}");
-
-        final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
-        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
-        final Map<Object, Object> msgs = putKafka.getMessagesSent();
-        assertEquals(1, msgs.size());
-        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
-        assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
-    }
-
-    @Test
-    public void validateHexKey() {
-        String topicName = "validateUtf8Key";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.KEY_ATTRIBUTE_ENCODING, PublishKafka.HEX_ENCODING);
-        runner.setProperty(PublishKafka.KEY, "${myKey}");
-
-        final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
-        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
-        runner.run(1);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
-        final Map<Object, Object> msgs = putKafka.getMessagesSent();
-        assertEquals(1, msgs.size());
-        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
-
-        assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
deleted file mode 100644
index 76c29cd..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.Test;
-
-public class PublishingContextTest {
-
-    @Test
-    public void failInvalidConstructorArgs() {
-        try {
-            new PublishingContext(null, null);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-        try {
-            new PublishingContext(mock(InputStream.class), null);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            new PublishingContext(mock(InputStream.class), "");
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            new PublishingContext(mock(InputStream.class), "mytopic", -3);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-    }
-
-    @Test
-    public void validateFullSetting() {
-        PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3);
-        publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
-        publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
-
-        assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
-        assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8));
-        assertEquals("topic", publishingContext.getTopic());
-        assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString());
-    }
-
-    @Test
-    public void validateOnlyOnceSetPerInstance() {
-        PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic");
-        publishingContext.setKeyBytes(new byte[]{0});
-        try {
-            publishingContext.setKeyBytes(new byte[]{0});
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        publishingContext.setDelimiterBytes(new byte[]{0});
-        try {
-            publishingContext.setDelimiterBytes(new byte[]{0});
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
deleted file mode 100644
index 533655e..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class StubPublishKafka extends PublishKafka {
-
-    private volatile Producer<byte[], byte[]> producer;
-
-    private volatile boolean failed;
-
-    private final int ackCheckSize;
-
-    private final ExecutorService executor = Executors.newCachedThreadPool();
-    private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
-
-    StubPublishKafka(int ackCheckSize) {
-        this.ackCheckSize = ackCheckSize;
-    }
-
-    public Producer<byte[], byte[]> getProducer() {
-        return producer;
-    }
-
-    public void destroy() {
-        this.executor.shutdownNow();
-    }
-
-    public Map<Object, Object> getMessagesSent() {
-        return new HashMap<>(msgsSent);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
-            throws ProcessException {
-        final Map<String, String> kafkaProperties = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        KafkaPublisher publisher;
-        try {
-            Field f = PublishKafka.class.getDeclaredField("brokers");
-            f.setAccessible(true);
-            f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
-            publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
-            publisher.setAckWaitTime(15000);
-            producer = mock(Producer.class);
-
-            this.instrumentProducer(producer, false);
-            Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
-            kf.setAccessible(true);
-            kf.set(publisher, producer);
-
-            Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog");
-            componentLogF.setAccessible(true);
-            componentLogF.set(publisher, mock(ComponentLog.class));
-
-            Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
-            ackCheckSizeField.setAccessible(true);
-            ackCheckSizeField.set(publisher, this.ackCheckSize);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IllegalStateException(e);
-        }
-        return publisher;
-    }
-
-    @SuppressWarnings("unchecked")
-    private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) {
-
-        when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
-            @Override
-            public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
-                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
-                if (record != null && record.key() != null) {
-                    msgsSent.put(record.key(), record.value());
-                }
-
-                String value = new String(record.value(), StandardCharsets.UTF_8);
-                if ("fail".equals(value) && !StubPublishKafka.this.failed) {
-                    StubPublishKafka.this.failed = true;
-                    throw new RuntimeException("intentional");
-                }
-                Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() {
-                    @Override
-                    public RecordMetadata call() throws Exception {
-                        if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
-                            StubPublishKafka.this.failed = true;
-                            throw new TopicAuthorizationException("Unauthorized");
-                        } else {
-                            TopicPartition partition = new TopicPartition("foo", 0);
-                            RecordMetadata meta = new RecordMetadata(partition, 0, 0);
-                            return meta;
-                        }
-                    }
-                });
-                return future;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
new file mode 100644
index 0000000..e54a10c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestInFlightMessageTracker {
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker();
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.awaitCompletion(1L);
+    }
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker();
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        final ExecutorService exec = Executors.newFixedThreadPool(1);
+        final Future<?> future = exec.submit(() -> {
+            try {
+                tracker.awaitCompletion(10000L);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.incrementAcknowledgedCount(flowFile);
+
+        future.get();
+    }
+
+    private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
+        try {
+            tracker.awaitCompletion(10L);
+            Assert.fail("Expected timeout");
+        } catch (final TimeoutException te) {
+            // expected
+        }
+    }
+
+}