You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/06 19:19:29 UTC

[3/5] nifi git commit: Revert "NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged"

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
new file mode 100644
index 0000000..b3f1bd1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.times;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+
+public class PublishKafkaTest {
+
+    @Test
+    public void validateCustomSerilaizerDeserializerSettings() throws Exception {
+        PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+        TestRunner runner = TestRunners.newTestRunner(publishKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
+        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3 sec");
+        runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+        TestRunner runner = TestRunners.newTestRunner(publishKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
+        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "foo");
+
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid"));
+        }
+    }
+
+    @Test
+    public void validateCustomValidation() {
+        String topicName = "validateCustomValidation";
+        PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+
+        /*
+         * Validates that Kerberos principle is required if one of SASL set for
+         * secirity protocol
+         */
+        TestRunner runner = TestRunners.newTestRunner(publishKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        try {
+            runner.run();
+            fail();
+        } catch (Throwable e) {
+            assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because"));
+        }
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateSingleCharacterDemarcatedMessages() {
+        String topicName = "validateSingleCharacterDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+
+        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
+        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
+
+        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
+        String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+        StubPublishKafka putKafka = new StubPublishKafka(1);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
+
+        runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3000 millis");
+
+        final String text = "Hello World\nGoodbye\nfail\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+        putKafka.destroy();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(1);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+        final String text = "Hello World\nGoodbye\nfail\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+        final String text = "futurefail\nHello World\nGoodbye\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
+        assertNotNull(ff);
+        runner.enqueue(ff);
+
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        // 6 sends due to duplication
+        verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+        final String text = "Hello World\nGoodbye\nfuturefail\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
+        assertNotNull(ff);
+        runner.enqueue(ff);
+
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        // 6 sends due to duplication
+        verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateDemarcationIntoEmptyMessages() {
+        String topicName = "validateDemarcationIntoEmptyMessages";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        final TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(PublishKafka_0_10.KEY, "key1");
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
+        runner.enqueue(bytes);
+        runner.run(1);
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateComplexRightPartialDemarcatedMessages() {
+        String topicName = "validateComplexRightPartialDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
+
+        runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateComplexLeftPartialDemarcatedMessages() {
+        String topicName = "validateComplexLeftPartialDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
+
+        runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateComplexPartialMatchDemarcatedMessages() {
+        String topicName = "validateComplexPartialMatchDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
+
+        runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDBOOMSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @Test
+    public void validateUtf8Key() {
+        String topicName = "validateUtf8Key";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+        final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
+        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+        final Map<Object, Object> msgs = putKafka.getMessagesSent();
+        assertEquals(1, msgs.size());
+        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+        assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
+    }
+
+    @Test
+    public void validateHexKey() {
+        String topicName = "validateUtf8Key";
+        StubPublishKafka putKafka = new StubPublishKafka(100);
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
+        runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+        final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
+        runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+        final Map<Object, Object> msgs = putKafka.getMessagesSent();
+        assertEquals(1, msgs.size());
+        final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+
+        assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
new file mode 100644
index 0000000..76c29cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+public class PublishingContextTest {
+
+    @Test
+    public void failInvalidConstructorArgs() {
+        try {
+            new PublishingContext(null, null);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+        try {
+            new PublishingContext(mock(InputStream.class), null);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new PublishingContext(mock(InputStream.class), "");
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new PublishingContext(mock(InputStream.class), "mytopic", -3);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+    }
+
+    @Test
+    public void validateFullSetting() {
+        PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3);
+        publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
+        publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
+
+        assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
+        assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8));
+        assertEquals("topic", publishingContext.getTopic());
+        assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString());
+    }
+
+    @Test
+    public void validateOnlyOnceSetPerInstance() {
+        PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic");
+        publishingContext.setKeyBytes(new byte[]{0});
+        try {
+            publishingContext.setKeyBytes(new byte[]{0});
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        publishingContext.setDelimiterBytes(new byte[]{0});
+        try {
+            publishingContext.setDelimiterBytes(new byte[]{0});
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
new file mode 100644
index 0000000..c009014
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class StubPublishKafka extends PublishKafka_0_10 {
+
+    private volatile Producer<byte[], byte[]> producer;
+
+    private volatile boolean failed;
+
+    private final int ackCheckSize;
+
+    private final ExecutorService executor = Executors.newCachedThreadPool();
+    private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
+
+    StubPublishKafka(int ackCheckSize) {
+        this.ackCheckSize = ackCheckSize;
+    }
+
+    public Producer<byte[], byte[]> getProducer() {
+        return producer;
+    }
+
+    public void destroy() {
+        this.executor.shutdownNow();
+    }
+
+    public Map<Object, Object> getMessagesSent() {
+        return new HashMap<>(msgsSent);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
+            throws ProcessException {
+        final Map<String, String> kafkaProperties = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        KafkaPublisher publisher;
+        try {
+            Field f = PublishKafka_0_10.class.getDeclaredField("brokers");
+            f.setAccessible(true);
+            f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
+            publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
+            publisher.setAckWaitTime(15000);
+            producer = mock(Producer.class);
+            this.instrumentProducer(producer, false);
+            Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
+            kf.setAccessible(true);
+            kf.set(publisher, producer);
+
+            Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog");
+            componentLogF.setAccessible(true);
+            componentLogF.set(publisher, mock(ComponentLog.class));
+
+            Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
+            ackCheckSizeField.setAccessible(true);
+            ackCheckSizeField.set(publisher, this.ackCheckSize);
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new IllegalStateException(e);
+        }
+        return publisher;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) {
+
+        when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
+            @Override
+            public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                if (record != null && record.key() != null) {
+                    msgsSent.put(record.key(), record.value());
+                }
+
+                String value = new String(record.value(), StandardCharsets.UTF_8);
+                if ("fail".equals(value) && !StubPublishKafka.this.failed) {
+                    StubPublishKafka.this.failed = true;
+                    throw new RuntimeException("intentional");
+                }
+                Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() {
+                    @Override
+                    public RecordMetadata call() throws Exception {
+                        if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
+                            StubPublishKafka.this.failed = true;
+                            throw new TopicAuthorizationException("Unauthorized");
+                        } else {
+                            TopicPartition partition = new TopicPartition("foo", 0);
+                            RecordMetadata meta = new RecordMetadata(partition, 0, 0);
+                            return meta;
+                        }
+                    }
+                });
+                return future;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
deleted file mode 100644
index e54a10c..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestInFlightMessageTracker {
-
-    @Test(timeout = 5000L)
-    public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
-        final MockFlowFile flowFile = new MockFlowFile(1L);
-
-        final InFlightMessageTracker tracker = new InFlightMessageTracker();
-        tracker.incrementSentCount(flowFile);
-
-        verifyNotComplete(tracker);
-
-        tracker.incrementSentCount(flowFile);
-        verifyNotComplete(tracker);
-
-        tracker.incrementAcknowledgedCount(flowFile);
-        verifyNotComplete(tracker);
-
-        tracker.incrementAcknowledgedCount(flowFile);
-        tracker.awaitCompletion(1L);
-    }
-
-    @Test(timeout = 5000L)
-    public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
-        final MockFlowFile flowFile = new MockFlowFile(1L);
-
-        final InFlightMessageTracker tracker = new InFlightMessageTracker();
-        tracker.incrementSentCount(flowFile);
-
-        verifyNotComplete(tracker);
-
-        tracker.incrementSentCount(flowFile);
-        verifyNotComplete(tracker);
-
-        final ExecutorService exec = Executors.newFixedThreadPool(1);
-        final Future<?> future = exec.submit(() -> {
-            try {
-                tracker.awaitCompletion(10000L);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        });
-
-        tracker.incrementAcknowledgedCount(flowFile);
-        tracker.incrementAcknowledgedCount(flowFile);
-
-        future.get();
-    }
-
-    private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
-        try {
-            tracker.awaitCompletion(10L);
-            Assert.fail("Expected timeout");
-        } catch (final TimeoutException te) {
-            // expected
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
deleted file mode 100644
index c7d1a60..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestPublishKafka {
-    private static final String TOPIC_NAME = "unit-test";
-
-    private PublisherPool mockPool;
-    private PublisherLease mockLease;
-    private TestRunner runner;
-
-    @Before
-    public void setup() {
-        mockPool = mock(PublisherPool.class);
-        mockLease = mock(PublisherLease.class);
-
-        when(mockPool.obtainPublisher()).thenReturn(mockLease);
-
-        runner = TestRunners.newTestRunner(new PublishKafka_0_10() {
-            @Override
-            protected PublisherPool createPublisherPool(final ProcessContext context) {
-                return mockPool;
-            }
-        });
-
-        runner.setProperty(PublishKafka_0_10.TOPIC, TOPIC_NAME);
-    }
-
-    @Test
-    public void testSingleSuccess() throws IOException {
-        final MockFlowFile flowFile = runner.enqueue("hello world");
-
-        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
-
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(1)).complete();
-        verify(mockLease, times(0)).poison();
-        verify(mockLease, times(1)).close();
-    }
-
-    @Test
-    public void testMultipleSuccess() throws IOException {
-        final Set<FlowFile> flowFiles = new HashSet<>();
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-
-
-        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 3);
-
-        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(1)).complete();
-        verify(mockLease, times(0)).poison();
-        verify(mockLease, times(1)).close();
-    }
-
-    @Test
-    public void testSingleFailure() throws IOException {
-        final MockFlowFile flowFile = runner.enqueue("hello world");
-
-        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_FAILURE, 1);
-
-        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(1)).complete();
-        verify(mockLease, times(1)).close();
-    }
-
-    @Test
-    public void testMultipleFailures() throws IOException {
-        final Set<FlowFile> flowFiles = new HashSet<>();
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-
-        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_FAILURE, 3);
-
-        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(1)).complete();
-        verify(mockLease, times(1)).close();
-    }
-
-    @Test
-    public void testMultipleMessagesPerFlowFile() throws IOException {
-        final List<FlowFile> flowFiles = new ArrayList<>();
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-
-        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
-        msgCounts.put(flowFiles.get(0), 10);
-        msgCounts.put(flowFiles.get(1), 20);
-
-        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
-
-        when(mockLease.complete()).thenReturn(result);
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 2);
-
-        verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(1)).complete();
-        verify(mockLease, times(0)).poison();
-        verify(mockLease, times(1)).close();
-
-        runner.assertAllFlowFilesContainAttribute("msg.count");
-        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
-            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
-            .count());
-        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
-            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
-            .count());
-    }
-
-
-    @Test
-    public void testSomeSuccessSomeFailure() throws IOException {
-        final List<FlowFile> flowFiles = new ArrayList<>();
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-        flowFiles.add(runner.enqueue("hello world"));
-
-        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
-        msgCounts.put(flowFiles.get(0), 10);
-        msgCounts.put(flowFiles.get(1), 20);
-
-        final Map<FlowFile, Exception> failureMap = new HashMap<>();
-        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
-        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
-
-        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
-
-        when(mockLease.complete()).thenReturn(result);
-
-        runner.run();
-        runner.assertTransferCount(PublishKafka_0_10.REL_SUCCESS, 2);
-        runner.assertTransferCount(PublishKafka_0_10.REL_FAILURE, 2);
-
-        verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
-        verify(mockLease, times(1)).complete();
-        verify(mockLease, times(1)).close();
-
-        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
-            .filter(ff -> "10".equals(ff.getAttribute("msg.count")))
-            .count());
-        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
-            .filter(ff -> "20".equals(ff.getAttribute("msg.count")))
-            .count());
-
-        assertTrue(runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).stream()
-            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
-    }
-
-
-    private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
-        return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
-    }
-
-    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
-        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
-        for (final FlowFile ff : successfulFlowFiles) {
-            msgCounts.put(ff, msgCountPerFlowFile);
-        }
-        return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
-    }
-
-    private PublishResult createFailurePublishResult(final FlowFile failure) {
-        return createFailurePublishResult(Collections.singleton(failure));
-    }
-
-    private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
-        final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
-        return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
-    }
-
-    private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
-        // sanity check.
-        for (final FlowFile success : successFlowFiles) {
-            if (failures.containsKey(success)) {
-                throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
-            }
-        }
-
-        return new PublishResult() {
-            @Override
-            public Collection<FlowFile> getSuccessfulFlowFiles() {
-                return successFlowFiles;
-            }
-
-            @Override
-            public Collection<FlowFile> getFailedFlowFiles() {
-                return failures.keySet();
-            }
-
-            @Override
-            public int getSuccessfulMessageCount(FlowFile flowFile) {
-                Integer count = msgCounts.get(flowFile);
-                return count == null ? 0 : count.intValue();
-            }
-
-            @Override
-            public Exception getReasonForFailure(FlowFile flowFile) {
-                return failures.get(flowFile);
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
deleted file mode 100644
index c2d143c..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-
-public class TestPublisherLease {
-    private ComponentLog logger;
-    private Producer<byte[], byte[]> producer;
-
-    @Before
-    @SuppressWarnings("unchecked")
-    public void setup() {
-        logger = Mockito.mock(ComponentLog.class);
-        producer = Mockito.mock(Producer.class);
-    }
-
-    @Test
-    public void testPoisonOnException() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
-            @Override
-            public void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
-
-        final FlowFile flowFile = new MockFlowFile(1L);
-        final String topic = "unit-test";
-        final byte[] messageKey = null;
-        final byte[] demarcatorBytes = null;
-
-        final InputStream failureInputStream = new InputStream() {
-            @Override
-            public int read() throws IOException {
-                throw new IOException("Intentional Unit Test Exception");
-            }
-        };
-
-        try {
-            lease.publish(flowFile, failureInputStream, messageKey, demarcatorBytes, topic);
-            Assert.fail("Expected IOException");
-        } catch (final IOException ioe) {
-            // expected
-        }
-
-        assertEquals(1, poisonCount.get());
-
-        final PublishResult result = lease.complete();
-        assertTrue(result.getFailedFlowFiles().contains(flowFile));
-        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
-    }
-
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testPoisonOnFailure() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
-            @Override
-            public void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
-
-        final FlowFile flowFile = new MockFlowFile(1L);
-        final String topic = "unit-test";
-        final byte[] messageKey = null;
-        final byte[] demarcatorBytes = null;
-
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(final InvocationOnMock invocation) throws Throwable {
-                final Callback callback = invocation.getArgumentAt(1, Callback.class);
-                callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
-                return null;
-            }
-        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
-
-        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic);
-
-        assertEquals(1, poisonCount.get());
-
-        final PublishResult result = lease.complete();
-        assertTrue(result.getFailedFlowFiles().contains(flowFile));
-        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
-    }
-
-    @Test
-    @SuppressWarnings("unchecked")
-    public void testAllDelimitedMessagesSent() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
-            @Override
-            protected void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
-
-        final AtomicInteger correctMessages = new AtomicInteger(0);
-        final AtomicInteger incorrectMessages = new AtomicInteger(0);
-        doAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
-                final byte[] value = record.value();
-                final String valueString = new String(value, StandardCharsets.UTF_8);
-                if ("1234567890".equals(valueString)) {
-                    correctMessages.incrementAndGet();
-                } else {
-                    incorrectMessages.incrementAndGet();
-                }
-
-                return null;
-            }
-        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
-
-        final FlowFile flowFile = new MockFlowFile(1L);
-        final String topic = "unit-test";
-        final byte[] messageKey = null;
-        final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
-
-        final byte[] flowFileContent = "1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
-        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
-
-        final byte[] flowFileContent2 = new byte[0];
-        lease.publish(new MockFlowFile(2L), new ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
-
-        final byte[] flowFileContent3 = "1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new line
-        lease.publish(new MockFlowFile(3L), new ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
-
-        final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
-        lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
-
-        assertEquals(0, poisonCount.get());
-
-        verify(producer, times(0)).flush();
-
-        final PublishResult result = lease.complete();
-        assertTrue(result.getFailedFlowFiles().contains(flowFile));
-        assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
-
-        assertEquals(7, correctMessages.get());
-        assertEquals(0, incorrectMessages.get());
-
-        verify(producer, times(1)).flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
deleted file mode 100644
index 7c70194..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-public class TestPublisherPool {
-
-    @Test
-    public void testLeaseCloseReturnsToPool() {
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        kafkaProperties.put("bootstrap.servers", "localhost:1111");
-        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
-        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
-
-        final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
-        assertEquals(0, pool.available());
-
-        final PublisherLease lease = pool.obtainPublisher();
-        assertEquals(0, pool.available());
-
-        lease.close();
-        assertEquals(1, pool.available());
-    }
-
-    @Test
-    public void testPoisonedLeaseNotReturnedToPool() {
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        kafkaProperties.put("bootstrap.servers", "localhost:1111");
-        kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
-        kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
-
-        final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
-        assertEquals(0, pool.available());
-
-        final PublisherLease lease = pool.obtainPublisher();
-        assertEquals(0, pool.available());
-
-        lease.poison();
-        lease.close();
-        assertEquals(0, pool.available());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
new file mode 100644
index 0000000..0ed00fb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.producer.KeyedMessage;
+import kafka.producer.OldProducer;
+
+/**
+ * Helper class which helps to produce events targeting {@link EmbeddedKafka}
+ * server.
+ */
+public class EmbeddedKafkaProducerHelper implements Closeable {
+
+    private final EmbeddedKafka kafkaServer;
+
+    private final OldProducer producer;
+
+    /**
+     * Will create an instance of EmbeddedKafkaProducerHelper based on default
+     * configurations.<br>
+     * Default configuration includes:<br>
+     * <i>
+     * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
+     * serializer.class=kafka.serializer.DefaultEncoder<br>
+     * key.serializer.class=kafka.serializer.DefaultEncoder<br>
+     * auto.create.topics.enable=true
+     * </i><br>
+     * <br>
+     * If you wish to supply additional configuration properties or override
+     * existing use
+     * {@link EmbeddedKafkaProducerHelper#EmbeddedKafkaProducerHelper(EmbeddedKafka, Properties)}
+     * constructor.
+     *
+     * @param kafkaServer
+     *            instance of {@link EmbeddedKafka}
+     */
+    public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer) {
+        this(kafkaServer, null);
+    }
+
+    /**
+     * Will create an instance of EmbeddedKafkaProducerHelper based on default
+     * configurations and additional configuration properties.<br>
+     * Default configuration includes:<br>
+     * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
+     * serializer.class=kafka.serializer.DefaultEncoder<br>
+     * key.serializer.class=kafka.serializer.DefaultEncoder<br>
+     * auto.create.topics.enable=true<br>
+     * <br>
+     *
+     * @param kafkaServer
+     *            instance of {@link EmbeddedKafka}
+     * @param additionalProperties
+     *            instance of {@link Properties} specifying additional producer
+     *            configuration properties.
+     */
+    public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer, Properties additionalProperties) {
+        this.kafkaServer = kafkaServer;
+        Properties producerProperties = new Properties();
+        producerProperties.put("metadata.broker.list", "localhost:" + this.kafkaServer.getKafkaPort());
+        producerProperties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+        producerProperties.put("key.serializer.class", "kafka.serializer.DefaultEncoder");
+        producerProperties.put("auto.create.topics.enable", "true");
+        if (additionalProperties != null) {
+            producerProperties.putAll(additionalProperties);
+        }
+        this.producer = new OldProducer(producerProperties);
+    }
+
+    /**
+     * Will send an event to a Kafka topic. If topic doesn't exist it will be
+     * auto-created.
+     *
+     * @param topicName
+     *            Kafka topic name.
+     * @param event
+     *            string representing an event(message) to be sent to Kafka.
+     */
+    public void sendEvent(String topicName, String event) {
+        KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>(topicName, event.getBytes());
+        this.producer.send(data.topic(), data.key(), data.message());
+    }
+
+    /**
+     * Will close the underlying Kafka producer.
+     */
+    @Override
+    public void close() throws IOException {
+        this.producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index e524589..5bc0e0e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -17,7 +17,6 @@
 package org.apache.nifi.processors.kafka;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
@@ -112,9 +111,8 @@ class KafkaPublisher implements Closeable {
      *            instance of {@link PublishingContext} which hold context
      *            information about the message(s) to be sent.
      * @return The index of the last successful offset.
-     * @throws IOException if unable to read from the Input Stream
      */
-    KafkaPublisherResult publish(PublishingContext publishingContext) throws IOException {
+    KafkaPublisherResult publish(PublishingContext publishingContext) {
         StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
             publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 64fdb1d..5792e64 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -223,8 +223,7 @@ public class ConsumeKafka extends AbstractProcessor {
         final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
                 ? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
                 : null;
-
-        final Map<String, Object> props = new HashMap<>();
+        final Map<String, String> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index e13a8c3..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -74,7 +74,7 @@ public class ConsumerPool implements Closeable {
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
-        final Map<String, Object> kafkaProperties,
+            final Map<String, String> kafkaProperties,
             final List<String> topics,
             final long maxWaitMillis,
             final String keyEncoding,
@@ -115,7 +115,6 @@ public class ConsumerPool implements Closeable {
              * sitting idle which could prompt excessive rebalances.
              */
             lease = new SimpleConsumerLease(consumer);
-
             /**
              * This subscription tightly couples the lease to the given
              * consumer. They cannot be separated from then on.
@@ -149,7 +148,7 @@ public class ConsumerPool implements Closeable {
         });
     }
 
-    private void closeConsumer(final Consumer<?, ?> consumer) {
+    private void closeConsumer(final Consumer consumer) {
         consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
deleted file mode 100644
index e7d5cb7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.flowfile.FlowFile;
-
-public class InFlightMessageTracker {
-    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
-    private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
-    private final Object progressMutex = new Object();
-
-    public void incrementAcknowledgedCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
-        counter.incrementAcknowledgedCount();
-
-        synchronized (progressMutex) {
-            progressMutex.notify();
-        }
-    }
-
-    public int getAcknowledgedCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.get(flowFile);
-        return (counter == null) ? 0 : counter.getAcknowledgedCount();
-    }
-
-    public void incrementSentCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
-        counter.incrementSentCount();
-    }
-
-    public int getSentCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.get(flowFile);
-        return (counter == null) ? 0 : counter.getSentCount();
-    }
-
-    public void fail(final FlowFile flowFile, final Exception exception) {
-        failures.putIfAbsent(flowFile, exception);
-
-        synchronized (progressMutex) {
-            progressMutex.notify();
-        }
-    }
-
-    public Exception getFailure(final FlowFile flowFile) {
-        return failures.get(flowFile);
-    }
-
-    public boolean isFailed(final FlowFile flowFile) {
-        return getFailure(flowFile) != null;
-    }
-
-    public void reset() {
-        messageCountsByFlowFile.clear();
-        failures.clear();
-    }
-
-    public PublishResult failOutstanding(final Exception exception) {
-        messageCountsByFlowFile.keySet().stream()
-            .filter(ff -> !isComplete(ff))
-            .filter(ff -> !failures.containsKey(ff))
-            .forEach(ff -> failures.put(ff, exception));
-
-        return createPublishResult();
-    }
-
-    private boolean isComplete(final FlowFile flowFile) {
-        final Counts counts = messageCountsByFlowFile.get(flowFile);
-        if (counts.getAcknowledgedCount() == counts.getSentCount()) {
-            // all messages received successfully.
-            return true;
-        }
-
-        if (failures.containsKey(flowFile)) {
-            // FlowFile failed so is complete
-            return true;
-        }
-
-        return false;
-    }
-
-    private boolean isComplete() {
-        return messageCountsByFlowFile.keySet().stream()
-            .allMatch(flowFile -> isComplete(flowFile));
-    }
-
-    void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
-        final long startTime = System.nanoTime();
-        final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
-
-        while (System.nanoTime() < maxTime) {
-            synchronized (progressMutex) {
-                if (isComplete()) {
-                    return;
-                }
-
-                progressMutex.wait(millis);
-            }
-        }
-
-        throw new TimeoutException();
-    }
-
-
-    PublishResult createPublishResult() {
-        return new PublishResult() {
-            @Override
-            public Collection<FlowFile> getSuccessfulFlowFiles() {
-                if (failures.isEmpty()) {
-                    return messageCountsByFlowFile.keySet();
-                }
-
-                final Set<FlowFile> flowFiles = new HashSet<>(messageCountsByFlowFile.keySet());
-                flowFiles.removeAll(failures.keySet());
-                return flowFiles;
-            }
-
-            @Override
-            public Collection<FlowFile> getFailedFlowFiles() {
-                return failures.keySet();
-            }
-
-            @Override
-            public int getSuccessfulMessageCount(final FlowFile flowFile) {
-                return getAcknowledgedCount(flowFile);
-            }
-
-            @Override
-            public Exception getReasonForFailure(final FlowFile flowFile) {
-                return getFailure(flowFile);
-            }
-        };
-    }
-
-    public static class Counts {
-        private final AtomicInteger sentCount = new AtomicInteger(0);
-        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
-
-        public void incrementSentCount() {
-            sentCount.incrementAndGet();
-        }
-
-        public void incrementAcknowledgedCount() {
-            acknowledgedCount.incrementAndGet();
-        }
-
-        public int getAcknowledgedCount() {
-            return acknowledgedCount.get();
-        }
-
-        public int getSentCount() {
-            return sentCount.get();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 3d09f2d..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -27,9 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SaslConfigs;
@@ -187,7 +186,7 @@ final class KafkaProcessorUtils {
 
         final Class<?> classType;
 
-        public KafkaConfigValidator(final Class<?> classType) {
+        public KafkaConfigValidator(final Class classType) {
             this.classType = classType;
         }
 
@@ -212,8 +211,7 @@ final class KafkaProcessorUtils {
         return builder.toString();
     }
 
-
-    static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
+    static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) {
         for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
             if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
                 // Translate SSLContext Service configuration into Kafka properties
@@ -232,33 +230,28 @@ final class KafkaProcessorUtils {
                     mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
                 }
             }
-
-            String propertyName = propertyDescriptor.getName();
-            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
+            String pName = propertyDescriptor.getName();
+            String pValue = propertyDescriptor.isExpressionLanguageSupported()
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
                     : context.getProperty(propertyDescriptor).getValue();
-
-            if (propertyValue != null) {
-                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
-                // or the standard NiFi time period such as "5 secs"
-                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
-                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
+            if (pValue != null) {
+                if (pName.endsWith(".ms")) { // kafka standard time notation
+                    pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS));
                 }
-
-                if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(propertyName, propertyValue);
+                if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+                    mapToPopulate.put(pName, pValue);
                 }
             }
         }
     }
 
-    private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
+    private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) {
         return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
     }
 
-    private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
+    private static Set<String> getPublicStaticStringFieldValues(final Class... classes) {
         final Set<String> strings = new HashSet<>();
-        for (final Class<?> classType : classes) {
+        for (final Class classType : classes) {
             for (final Field field : classType.getDeclaredFields()) {
                 if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
                     try {