You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/06 19:19:31 UTC

[5/5] nifi git commit: Revert "NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged"

Revert "NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged"

This reverts commit 92cca96d49042f9898f93b3a2d2210b924708e52.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a4ed6221
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a4ed6221
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a4ed6221

Branch: refs/heads/master
Commit: a4ed622152187155463af2b748c9bf492621bbc7
Parents: 53f7a21
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Oct 6 15:19:00 2016 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Thu Oct 6 15:19:00 2016 -0400

----------------------------------------------------------------------
 .../io/exception/TokenTooLargeException.java    |  26 -
 .../nifi/stream/io/util/StreamDemarcator.java   |  38 +-
 .../stream/io/util/StreamDemarcatorTest.java    |  84 +--
 .../nifi/util/StandardProcessorTestRunner.java  |  40 +-
 .../java/org/apache/nifi/util/TestRunner.java   |  16 +-
 .../kafka/pubsub/ConsumeKafka_0_10.java         |   2 +-
 .../processors/kafka/pubsub/ConsumerPool.java   |   4 +-
 .../kafka/pubsub/InFlightMessageTracker.java    | 178 -----
 .../kafka/pubsub/KafkaProcessorUtils.java       |  33 +-
 .../processors/kafka/pubsub/KafkaPublisher.java | 236 +++++++
 .../kafka/pubsub/PublishKafka_0_10.java         | 652 ++++++++++++-------
 .../processors/kafka/pubsub/PublishResult.java  |  56 --
 .../processors/kafka/pubsub/PublisherLease.java | 132 ----
 .../processors/kafka/pubsub/PublisherPool.java  |  98 ---
 .../kafka/pubsub/PublishingContext.java         | 124 ++++
 .../kafka/pubsub/KafkaPublisherTest.java        | 306 +++++++++
 .../kafka/pubsub/PublishKafkaTest.java          | 375 +++++++++++
 .../kafka/pubsub/PublishingContextTest.java     |  91 +++
 .../kafka/pubsub/StubPublishKafka.java          | 143 ++++
 .../pubsub/TestInFlightMessageTracker.java      |  87 ---
 .../kafka/pubsub/TestPublishKafka.java          | 262 --------
 .../kafka/pubsub/TestPublisherLease.java        | 194 ------
 .../kafka/pubsub/TestPublisherPool.java         |  68 --
 .../kafka/test/EmbeddedKafkaProducerHelper.java | 110 ++++
 .../nifi/processors/kafka/KafkaPublisher.java   |   4 +-
 .../processors/kafka/pubsub/ConsumeKafka.java   |   3 +-
 .../processors/kafka/pubsub/ConsumerPool.java   |   5 +-
 .../kafka/pubsub/InFlightMessageTracker.java    | 178 -----
 .../kafka/pubsub/KafkaProcessorUtils.java       |  33 +-
 .../processors/kafka/pubsub/KafkaPublisher.java | 236 +++++++
 .../processors/kafka/pubsub/PublishKafka.java   | 639 +++++++++++-------
 .../processors/kafka/pubsub/PublishResult.java  |  56 --
 .../processors/kafka/pubsub/PublisherLease.java | 132 ----
 .../processors/kafka/pubsub/PublisherPool.java  |  98 ---
 .../kafka/pubsub/PublishingContext.java         | 124 ++++
 .../org.apache.nifi.processor.Processor         |   2 +-
 .../kafka/pubsub/KafkaPublisherTest.java        | 306 +++++++++
 .../kafka/pubsub/PublishKafkaTest.java          | 375 +++++++++++
 .../kafka/pubsub/PublishingContextTest.java     |  91 +++
 .../kafka/pubsub/StubPublishKafka.java          | 144 ++++
 .../pubsub/TestInFlightMessageTracker.java      |  87 ---
 .../kafka/pubsub/TestPublishKafka.java          | 262 --------
 .../kafka/pubsub/TestPublisherLease.java        | 194 ------
 .../kafka/pubsub/TestPublisherPool.java         |  68 --
 .../kafka/test/EmbeddedKafkaProducerHelper.java | 110 ++++
 45 files changed, 3665 insertions(+), 2837 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
deleted file mode 100644
index 7024f34..0000000
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/exception/TokenTooLargeException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.stream.io.exception;
-
-import java.io.IOException;
-
-public class TokenTooLargeException extends IOException {
-    public TokenTooLargeException(final String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
index dc3d829..3064f1c 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
@@ -16,12 +16,9 @@
  */
 package org.apache.nifi.stream.io.util;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.nifi.stream.io.exception.TokenTooLargeException;
-
 /**
  * The <code>StreamDemarcator</code> class takes an input stream and demarcates
  * it so it could be read (see {@link #nextToken()}) as individual byte[]
@@ -29,7 +26,7 @@ import org.apache.nifi.stream.io.exception.TokenTooLargeException;
  * stream will be read into a single token which may result in
  * {@link OutOfMemoryError} if stream is too large.
  */
-public class StreamDemarcator implements Closeable {
+public class StreamDemarcator {
 
     private final static int INIT_BUFFER_SIZE = 8192;
 
@@ -98,10 +95,8 @@ public class StreamDemarcator implements Closeable {
     /**
      * Will read the next data token from the {@link InputStream} returning null
      * when it reaches the end of the stream.
-     *
-     * @throws IOException if unable to read from the stream
      */
-    public byte[] nextToken() throws IOException {
+    public byte[] nextToken() {
         byte[] data = null;
         int j = 0;
 
@@ -131,10 +126,8 @@ public class StreamDemarcator implements Closeable {
     /**
      * Will fill the current buffer from current 'index' position, expanding it
      * and or shuffling it if necessary
-     *
-     * @throws IOException if unable to read from the stream
      */
-    private void fill() throws IOException {
+    private void fill() {
         if (this.index >= this.buffer.length) {
             if (this.mark == 0) { // expand
                 byte[] newBuff = new byte[this.buffer.length + this.initialBufferSize];
@@ -148,16 +141,20 @@ public class StreamDemarcator implements Closeable {
             }
         }
 
-        int bytesRead;
-        do {
-            bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
-        } while (bytesRead == 0);
+        try {
+            int bytesRead;
+            do {
+                bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
+            } while (bytesRead == 0);
 
-        if (bytesRead != -1) {
-            this.readAheadLength = this.index + bytesRead;
-            if (this.readAheadLength > this.maxDataSize) {
-                throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
+            if (bytesRead != -1) {
+                this.readAheadLength = this.index + bytesRead;
+                if (this.readAheadLength > this.maxDataSize) {
+                    throw new IllegalStateException("Maximum allowed data size of " + this.maxDataSize + " exceeded.");
+                }
             }
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed while reading InputStream", e);
         }
     }
 
@@ -191,9 +188,4 @@ public class StreamDemarcator implements Closeable {
             throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
         }
     }
-
-    @Override
-    public void close() throws IOException {
-        is.close();
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
index 66d2668..93082a2 100644
--- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
+++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
@@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
@@ -66,7 +65,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateNoDelimiter() throws IOException {
+    public void validateNoDelimiter() {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -77,7 +76,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateNoDelimiterSmallInitialBuffer() throws IOException {
+    public void validateNoDelimiterSmallInitialBuffer() {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 1);
@@ -85,7 +84,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateSingleByteDelimiter() throws IOException {
+    public void validateSingleByteDelimiter() {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -96,7 +95,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateDelimiterAtTheBeginning() throws IOException {
+    public void validateDelimiterAtTheBeginning() {
         String data = ",Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -107,7 +106,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateEmptyDelimiterSegments() throws IOException {
+    public void validateEmptyDelimiterSegments() {
         String data = ",,,,,Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000);
@@ -118,7 +117,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateSingleByteDelimiterSmallInitialBuffer() throws IOException {
+    public void validateSingleByteDelimiterSmallInitialBuffer() {
         String data = "Learn from yesterday, live for today, hope for tomorrow. The important thing is not to stop questioning.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, ",".getBytes(StandardCharsets.UTF_8), 1000, 2);
@@ -129,7 +128,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteDelimiter() throws IOException {
+    public void validateWithMultiByteDelimiter() {
         String data = "foodaabardaabazzz";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -140,7 +139,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteDelimiterAtTheBeginning() throws IOException {
+    public void validateWithMultiByteDelimiterAtTheBeginning() {
         String data = "daafoodaabardaabazzz";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000);
@@ -151,7 +150,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteDelimiterSmallInitialBuffer() throws IOException {
+    public void validateWithMultiByteDelimiterSmallInitialBuffer() {
         String data = "foodaabarffdaabazz";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, "daa".getBytes(StandardCharsets.UTF_8), 1000, 1);
@@ -162,7 +161,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteCharsNoDelimiter() throws IOException {
+    public void validateWithMultiByteCharsNoDelimiter() {
         String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
@@ -173,7 +172,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() throws IOException {
+    public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() {
         String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
         StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 2);
@@ -184,7 +183,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateWithComplexDelimiter() throws IOException {
+    public void validateWithComplexDelimiter() {
         String data = "THIS IS MY TEXT<MYDELIMITER>THIS IS MY NEW TEXT<MYDELIMITER>THIS IS MY NEWEST TEXT";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
         StreamDemarcator scanner = new StreamDemarcator(is, "<MYDELIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
@@ -194,8 +193,8 @@ public class StreamDemarcatorTest {
         assertNull(scanner.nextToken());
     }
 
-    @Test(expected = IOException.class)
-    public void validateMaxBufferSize() throws IOException {
+    @Test(expected = IllegalStateException.class)
+    public void validateMaxBufferSize() {
         String data = "THIS IS MY TEXT<MY DELIMITER>THIS IS MY NEW TEXT<MY DELIMITER>THIS IS MY NEWEST TEXT";
         ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
         StreamDemarcator scanner = new StreamDemarcator(is, "<MY DELIMITER>".getBytes(StandardCharsets.UTF_8), 20);
@@ -203,7 +202,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() throws IOException {
+    public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() {
         ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
         StreamDemarcator scanner = new StreamDemarcator(is, null, 20);
         byte[] b = scanner.nextToken();
@@ -211,7 +210,7 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void validateScannerHandlesNegativeOneByteInputs() throws IOException {
+    public void validateScannerHandlesNegativeOneByteInputs() {
         ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
         StreamDemarcator scanner = new StreamDemarcator(is, "water".getBytes(StandardCharsets.UTF_8), 20, 1024);
         byte[] b = scanner.nextToken();
@@ -219,59 +218,10 @@ public class StreamDemarcatorTest {
     }
 
     @Test
-    public void verifyScannerHandlesNegativeOneByteDelimiter() throws IOException {
+    public void verifyScannerHandlesNegativeOneByteDelimiter() {
         ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
         StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 20, 1024);
         assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0, 0 });
         assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0 });
     }
-
-    @Test
-    public void testWithoutTrailingDelimiter() throws IOException {
-        final byte[] inputData = "Larger Message First\nSmall".getBytes(StandardCharsets.UTF_8);
-        ByteArrayInputStream is = new ByteArrayInputStream(inputData);
-        StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000);
-
-        final byte[] first = scanner.nextToken();
-        final byte[] second = scanner.nextToken();
-        assertNotNull(first);
-        assertNotNull(second);
-
-        assertEquals("Larger Message First", new String(first, StandardCharsets.UTF_8));
-        assertEquals("Small", new String(second, StandardCharsets.UTF_8));
-    }
-
-    @Test
-    public void testOnBufferSplitNoTrailingDelimiter() throws IOException {
-        final byte[] inputData = "Yes\nNo".getBytes(StandardCharsets.UTF_8);
-        ByteArrayInputStream is = new ByteArrayInputStream(inputData);
-        StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
-
-        final byte[] first = scanner.nextToken();
-        final byte[] second = scanner.nextToken();
-        assertNotNull(first);
-        assertNotNull(second);
-
-        assertArrayEquals(first, new byte[] {'Y', 'e', 's'});
-        assertArrayEquals(second, new byte[] {'N', 'o'});
-    }
-
-    @Test
-    public void testOnBufferSplit() throws IOException {
-        final byte[] inputData = "123\n456\n789".getBytes(StandardCharsets.UTF_8);
-        ByteArrayInputStream is = new ByteArrayInputStream(inputData);
-        StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3);
-
-        final byte[] first = scanner.nextToken();
-        final byte[] second = scanner.nextToken();
-        final byte[] third = scanner.nextToken();
-        assertNotNull(first);
-        assertNotNull(second);
-        assertNotNull(third);
-
-        assertArrayEquals(first, new byte[] {'1', '2', '3'});
-        assertArrayEquals(second, new byte[] {'4', '5', '6'});
-        assertArrayEquals(third, new byte[] {'7', '8', '9'});
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 7c0cc0f..2a1451a 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -369,55 +369,54 @@ public class StandardProcessorTestRunner implements TestRunner {
     }
 
     @Override
-    public MockFlowFile enqueue(final Path path) throws IOException {
-        return enqueue(path, new HashMap<String, String>());
+    public void enqueue(final Path path) throws IOException {
+        enqueue(path, new HashMap<String, String>());
     }
 
     @Override
-    public MockFlowFile enqueue(final Path path, final Map<String, String> attributes) throws IOException {
+    public void enqueue(final Path path, final Map<String, String> attributes) throws IOException {
         final Map<String, String> modifiedAttributes = new HashMap<>(attributes);
         if (!modifiedAttributes.containsKey(CoreAttributes.FILENAME.key())) {
             modifiedAttributes.put(CoreAttributes.FILENAME.key(), path.toFile().getName());
         }
         try (final InputStream in = Files.newInputStream(path)) {
-            return enqueue(in, modifiedAttributes);
+            enqueue(in, modifiedAttributes);
         }
     }
 
     @Override
-    public MockFlowFile enqueue(final byte[] data) {
-        return enqueue(data, new HashMap<String, String>());
+    public void enqueue(final byte[] data) {
+        enqueue(data, new HashMap<String, String>());
     }
 
     @Override
-    public MockFlowFile enqueue(final String data) {
-        return enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
+    public void enqueue(final String data) {
+        enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap());
     }
 
     @Override
-    public MockFlowFile enqueue(final byte[] data, final Map<String, String> attributes) {
-        return enqueue(new ByteArrayInputStream(data), attributes);
+    public void enqueue(final byte[] data, final Map<String, String> attributes) {
+        enqueue(new ByteArrayInputStream(data), attributes);
     }
 
     @Override
-    public MockFlowFile enqueue(final String data, final Map<String, String> attributes) {
-        return enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
+    public void enqueue(final String data, final Map<String, String> attributes) {
+        enqueue(data.getBytes(StandardCharsets.UTF_8), attributes);
     }
 
 
     @Override
-    public MockFlowFile enqueue(final InputStream data) {
-        return enqueue(data, new HashMap<String, String>());
+    public void enqueue(final InputStream data) {
+        enqueue(data, new HashMap<String, String>());
     }
 
     @Override
-    public MockFlowFile enqueue(final InputStream data, final Map<String, String> attributes) {
+    public void enqueue(final InputStream data, final Map<String, String> attributes) {
         final MockProcessSession session = new MockProcessSession(new SharedSessionState(processor, idGenerator), processor);
         MockFlowFile flowFile = session.create();
         flowFile = session.importFrom(data, flowFile);
         flowFile = session.putAllAttributes(flowFile, attributes);
         enqueue(flowFile);
-        return flowFile;
     }
 
     @Override
@@ -879,20 +878,17 @@ public class StandardProcessorTestRunner implements TestRunner {
     @Override
     public void assertAllConditionsMet(final Relationship relationship, Predicate<MockFlowFile> predicate) {
 
-        if (predicate==null) {
+        if (predicate==null)
             Assert.fail("predicate cannot be null");
-        }
 
         final List<MockFlowFile> flowFiles = getFlowFilesForRelationship(relationship);
 
-        if (flowFiles.isEmpty()) {
+        if (flowFiles.isEmpty())
             Assert.fail("Relationship " + relationship.getName() + " does not contain any FlowFile");
-        }
 
         for (MockFlowFile flowFile : flowFiles) {
-            if (predicate.test(flowFile)==false) {
+            if (predicate.test(flowFile)==false)
                 Assert.fail("FlowFile " + flowFile + " does not meet all condition");
-            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 1c014c3..0512416 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -383,7 +383,7 @@ public interface TestRunner {
      * @param path to read content from
      * @throws IOException if unable to read content
      */
-    MockFlowFile enqueue(Path path) throws IOException;
+    void enqueue(Path path) throws IOException;
 
     /**
      * Reads the content from the given {@link Path} into memory and creates a
@@ -394,7 +394,7 @@ public interface TestRunner {
      * @param attributes attributes to use for new flow file
      * @throws IOException if unable to read content
      */
-    MockFlowFile enqueue(Path path, Map<String, String> attributes) throws IOException;
+    void enqueue(Path path, Map<String, String> attributes) throws IOException;
 
     /**
      * Copies the content from the given byte array into memory and creates a
@@ -403,7 +403,7 @@ public interface TestRunner {
      *
      * @param data to enqueue
      */
-    MockFlowFile enqueue(byte[] data);
+    void enqueue(byte[] data);
 
     /**
      * Creates a FlowFile with the content set to the given string (in UTF-8 format), with no attributes,
@@ -411,7 +411,7 @@ public interface TestRunner {
      *
      * @param data to enqueue
      */
-    MockFlowFile enqueue(String data);
+    void enqueue(String data);
 
     /**
      * Copies the content from the given byte array into memory and creates a
@@ -421,7 +421,7 @@ public interface TestRunner {
      * @param data to enqueue
      * @param attributes to use for enqueued item
      */
-    MockFlowFile enqueue(byte[] data, Map<String, String> attributes);
+    void enqueue(byte[] data, Map<String, String> attributes);
 
     /**
      * Creates a FlowFile with the content set to the given string (in UTF-8 format), with the given attributes,
@@ -430,7 +430,7 @@ public interface TestRunner {
      * @param data to enqueue
      * @param attributes to use for enqueued item
      */
-    MockFlowFile enqueue(String data, Map<String, String> attributes);
+    void enqueue(String data, Map<String, String> attributes);
 
     /**
      * Reads the content from the given {@link InputStream} into memory and
@@ -439,7 +439,7 @@ public interface TestRunner {
      *
      * @param data to source data from
      */
-    MockFlowFile enqueue(InputStream data);
+    void enqueue(InputStream data);
 
     /**
      * Reads the content from the given {@link InputStream} into memory and
@@ -449,7 +449,7 @@ public interface TestRunner {
      * @param data source of data
      * @param attributes to use for flow files
      */
-    MockFlowFile enqueue(InputStream data, Map<String, String> attributes);
+    void enqueue(InputStream data, Map<String, String> attributes);
 
     /**
      * Copies the contents of the given {@link MockFlowFile} into a byte array

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 8ca3494..b061fcc 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -223,7 +223,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
                 ? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
                 : null;
-        final Map<String, Object> props = new HashMap<>();
+        final Map<String, String> props = new HashMap<>();
         KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index baacdc7..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -74,7 +74,7 @@ public class ConsumerPool implements Closeable {
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
-        final Map<String, Object> kafkaProperties,
+            final Map<String, String> kafkaProperties,
             final List<String> topics,
             final long maxWaitMillis,
             final String keyEncoding,
@@ -148,7 +148,7 @@ public class ConsumerPool implements Closeable {
         });
     }
 
-    private void closeConsumer(final Consumer<?, ?> consumer) {
+    private void closeConsumer(final Consumer consumer) {
         consumerClosedCountRef.incrementAndGet();
         try {
             consumer.unsubscribe();

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
deleted file mode 100644
index e7d5cb7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.flowfile.FlowFile;
-
-public class InFlightMessageTracker {
-    private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
-    private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
-    private final Object progressMutex = new Object();
-
-    public void incrementAcknowledgedCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
-        counter.incrementAcknowledgedCount();
-
-        synchronized (progressMutex) {
-            progressMutex.notify();
-        }
-    }
-
-    public int getAcknowledgedCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.get(flowFile);
-        return (counter == null) ? 0 : counter.getAcknowledgedCount();
-    }
-
-    public void incrementSentCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
-        counter.incrementSentCount();
-    }
-
-    public int getSentCount(final FlowFile flowFile) {
-        final Counts counter = messageCountsByFlowFile.get(flowFile);
-        return (counter == null) ? 0 : counter.getSentCount();
-    }
-
-    public void fail(final FlowFile flowFile, final Exception exception) {
-        failures.putIfAbsent(flowFile, exception);
-
-        synchronized (progressMutex) {
-            progressMutex.notify();
-        }
-    }
-
-    public Exception getFailure(final FlowFile flowFile) {
-        return failures.get(flowFile);
-    }
-
-    public boolean isFailed(final FlowFile flowFile) {
-        return getFailure(flowFile) != null;
-    }
-
-    public void reset() {
-        messageCountsByFlowFile.clear();
-        failures.clear();
-    }
-
-    public PublishResult failOutstanding(final Exception exception) {
-        messageCountsByFlowFile.keySet().stream()
-            .filter(ff -> !isComplete(ff))
-            .filter(ff -> !failures.containsKey(ff))
-            .forEach(ff -> failures.put(ff, exception));
-
-        return createPublishResult();
-    }
-
-    private boolean isComplete(final FlowFile flowFile) {
-        final Counts counts = messageCountsByFlowFile.get(flowFile);
-        if (counts.getAcknowledgedCount() == counts.getSentCount()) {
-            // all messages received successfully.
-            return true;
-        }
-
-        if (failures.containsKey(flowFile)) {
-            // FlowFile failed so is complete
-            return true;
-        }
-
-        return false;
-    }
-
-    private boolean isComplete() {
-        return messageCountsByFlowFile.keySet().stream()
-            .allMatch(flowFile -> isComplete(flowFile));
-    }
-
-    void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
-        final long startTime = System.nanoTime();
-        final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
-
-        while (System.nanoTime() < maxTime) {
-            synchronized (progressMutex) {
-                if (isComplete()) {
-                    return;
-                }
-
-                progressMutex.wait(millis);
-            }
-        }
-
-        throw new TimeoutException();
-    }
-
-
-    PublishResult createPublishResult() {
-        return new PublishResult() {
-            @Override
-            public Collection<FlowFile> getSuccessfulFlowFiles() {
-                if (failures.isEmpty()) {
-                    return messageCountsByFlowFile.keySet();
-                }
-
-                final Set<FlowFile> flowFiles = new HashSet<>(messageCountsByFlowFile.keySet());
-                flowFiles.removeAll(failures.keySet());
-                return flowFiles;
-            }
-
-            @Override
-            public Collection<FlowFile> getFailedFlowFiles() {
-                return failures.keySet();
-            }
-
-            @Override
-            public int getSuccessfulMessageCount(final FlowFile flowFile) {
-                return getAcknowledgedCount(flowFile);
-            }
-
-            @Override
-            public Exception getReasonForFailure(final FlowFile flowFile) {
-                return getFailure(flowFile);
-            }
-        };
-    }
-
-    public static class Counts {
-        private final AtomicInteger sentCount = new AtomicInteger(0);
-        private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
-
-        public void incrementSentCount() {
-            sentCount.incrementAndGet();
-        }
-
-        public void incrementAcknowledgedCount() {
-            acknowledgedCount.incrementAndGet();
-        }
-
-        public int getAcknowledgedCount() {
-            return acknowledgedCount.get();
-        }
-
-        public int getSentCount() {
-            return sentCount.get();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 3d09f2d..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -27,9 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SaslConfigs;
@@ -187,7 +186,7 @@ final class KafkaProcessorUtils {
 
         final Class<?> classType;
 
-        public KafkaConfigValidator(final Class<?> classType) {
+        public KafkaConfigValidator(final Class classType) {
             this.classType = classType;
         }
 
@@ -212,8 +211,7 @@ final class KafkaProcessorUtils {
         return builder.toString();
     }
 
-
-    static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
+    static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) {
         for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
             if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
                 // Translate SSLContext Service configuration into Kafka properties
@@ -232,33 +230,28 @@ final class KafkaProcessorUtils {
                     mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
                 }
             }
-
-            String propertyName = propertyDescriptor.getName();
-            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
+            String pName = propertyDescriptor.getName();
+            String pValue = propertyDescriptor.isExpressionLanguageSupported()
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
                     : context.getProperty(propertyDescriptor).getValue();
-
-            if (propertyValue != null) {
-                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
-                // or the standard NiFi time period such as "5 secs"
-                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
-                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
+            if (pValue != null) {
+                if (pName.endsWith(".ms")) { // kafka standard time notation
+                    pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS));
                 }
-
-                if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(propertyName, propertyValue);
+                if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+                    mapToPopulate.put(pName, pValue);
                 }
             }
         }
     }
 
-    private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
+    private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) {
         return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
     }
 
-    private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
+    private static Set<String> getPublicStaticStringFieldValues(final Class... classes) {
         final Set<String> strings = new HashSet<>();
-        for (final Class<?> classType : classes) {
+        for (final Class classType : classes) {
             for (final Field field : classType.getDeclaredFields()) {
                 if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
                     try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
new file mode 100644
index 0000000..31a084f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+/**
+ * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
+ * with sending contents of the {@link FlowFile}s to Kafka.
+ */
+class KafkaPublisher implements Closeable {
+
+    private final Producer<byte[], byte[]> kafkaProducer;
+
+    private volatile long ackWaitTime = 30000;
+
+    private final ComponentLog componentLog;
+
+    private final int ackCheckSize;
+
+    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
+        this(kafkaProperties, 100, componentLog);
+    }
+
+    /**
+     * Creates an instance of this class as well as the instance of the
+     * corresponding Kafka {@link KafkaProducer} using provided Kafka
+     * configuration properties.
+     *
+     * @param kafkaProperties instance of {@link Properties} used to bootstrap
+     * {@link KafkaProducer}
+     */
+    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
+        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
+        this.ackCheckSize = ackCheckSize;
+        this.componentLog = componentLog;
+    }
+
+    /**
+     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
+     * determine how many messages to Kafka will be sent from a provided
+     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
+     * It supports two publishing modes:
+     * <ul>
+     * <li>Sending all messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * <li>Sending only unacknowledged messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * </ul>
+     * The unacknowledged messages are determined from the value of
+     * {@link PublishingContext#getLastAckedMessageIndex()}.
+     * <br>
+     * This method assumes content stream affinity where it is expected that the
+     * content stream that represents the same Kafka message(s) will remain the
+     * same across possible retries. This is required specifically for cases
+     * where delimiter is used and a single content stream may represent
+     * multiple Kafka messages. The
+     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
+     * index of the last ACKed message, so upon retry only messages with the
+     * higher index are sent.
+     *
+     * @param publishingContext instance of {@link PublishingContext} which hold
+     * context information about the message(s) to be sent.
+     * @return The index of the last successful offset.
+     */
+    KafkaPublisherResult publish(PublishingContext publishingContext) {
+        StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
+                publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
+
+        int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
+        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
+
+        byte[] messageBytes;
+        int tokenCounter = 0;
+        boolean continueSending = true;
+        KafkaPublisherResult result = null;
+        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
+            if (prevLastAckedMessageIndex < tokenCounter) {
+                ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
+                resultFutures.add(this.kafkaProducer.send(message));
+
+                if (tokenCounter % this.ackCheckSize == 0) {
+                    int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+                    resultFutures.clear();
+                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
+                        continueSending = false;
+                        result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+                    }
+                    prevLastAckedMessageIndex = lastAckedMessageIndex;
+                }
+            }
+        }
+
+        if (result == null) {
+            int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
+            resultFutures.clear();
+            result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
+        }
+        return result;
+    }
+
+    /**
+     * Sets the time this publisher will wait for the {@link Future#get()}
+     * operation (the Future returned by
+     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
+     * out.
+     *
+     * This value will also be used as a timeout when closing the underlying
+     * {@link KafkaProducer}. See {@link #close()}.
+     */
+    void setAckWaitTime(long ackWaitTime) {
+        this.ackWaitTime = ackWaitTime;
+    }
+
+    /**
+     * This operation will process ACKs from Kafka in the order in which
+     * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning
+     * the index of the last ACKed message. Within this operation processing ACK
+     * simply means successful invocation of 'get()' operation on the
+     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
+     * operation. Upon encountering any type of error while interrogating such
+     * {@link Future} the ACK loop will end. Messages that were not ACKed would
+     * be considered non-delivered and therefore could be resent at the later
+     * time.
+     *
+     * @param sendFutures list of {@link Future}s representing results of
+     * publishing to Kafka
+     *
+     * @param lastAckMessageIndex the index of the last ACKed message. It is
+     * important to provide the last ACKed message especially while re-trying so
+     * the proper index is maintained.
+     */
+    private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) {
+        boolean exceptionThrown = false;
+        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) {
+            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
+            try {
+                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+                lastAckMessageIndex++;
+            } catch (InterruptedException e) {
+                exceptionThrown = true;
+                Thread.currentThread().interrupt();
+                this.warnOrError("Interrupted while waiting for acks from Kafka", null);
+            } catch (ExecutionException e) {
+                exceptionThrown = true;
+                this.warnOrError("Failed while waiting for acks from Kafka", e);
+            } catch (TimeoutException e) {
+                exceptionThrown = true;
+                this.warnOrError("Timed out while waiting for acks from Kafka", null);
+            }
+        }
+
+        return lastAckMessageIndex;
+    }
+
+    /**
+     * Will close the underlying {@link KafkaProducer} waiting if necessary for
+     * the same duration as supplied {@link #setAckWaitTime(long)}
+     */
+    @Override
+    public void close() {
+        this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     *
+     */
+    private void warnOrError(String message, Exception e) {
+        if (e == null) {
+            this.componentLog.warn(message);
+        } else {
+            this.componentLog.error(message, e);
+        }
+    }
+
+    /**
+     * Encapsulates the result received from publishing messages to Kafka
+     */
+    static class KafkaPublisherResult {
+
+        private final int messagesSent;
+        private final int lastMessageAcked;
+
+        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
+            this.messagesSent = messagesSent;
+            this.lastMessageAcked = lastMessageAcked;
+        }
+
+        public int getMessagesSent() {
+            return this.messagesSent;
+        }
+
+        public int getLastMessageAcked() {
+            return this.lastMessageAcked;
+        }
+
+        public boolean isAllAcked() {
+            return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
+        }
+
+        @Override
+        public String toString() {
+            return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked;
+        }
+    }
+}