You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/06 14:06:15 UTC

[2/5] nifi git commit: NIFI-2865: Refactored PublishKafka and PublishKafka_0_10 to allow batching of FlowFiles within a single publish and to let messages timeout if not acknowledged

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
deleted file mode 100644
index 31a084f..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.Closeable;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.stream.io.util.StreamDemarcator;
-
-/**
- * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
- * with sending contents of the {@link FlowFile}s to Kafka.
- */
-class KafkaPublisher implements Closeable {
-
-    private final Producer<byte[], byte[]> kafkaProducer;
-
-    private volatile long ackWaitTime = 30000;
-
-    private final ComponentLog componentLog;
-
-    private final int ackCheckSize;
-
-    KafkaPublisher(Properties kafkaProperties, ComponentLog componentLog) {
-        this(kafkaProperties, 100, componentLog);
-    }
-
-    /**
-     * Creates an instance of this class as well as the instance of the
-     * corresponding Kafka {@link KafkaProducer} using provided Kafka
-     * configuration properties.
-     *
-     * @param kafkaProperties instance of {@link Properties} used to bootstrap
-     * {@link KafkaProducer}
-     */
-    KafkaPublisher(Properties kafkaProperties, int ackCheckSize, ComponentLog componentLog) {
-        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
-        this.ackCheckSize = ackCheckSize;
-        this.componentLog = componentLog;
-    }
-
-    /**
-     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
-     * determine how many messages to Kafka will be sent from a provided
-     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
-     * It supports two publishing modes:
-     * <ul>
-     * <li>Sending all messages constructed from
-     * {@link StreamDemarcator#nextToken()} operation.</li>
-     * <li>Sending only unacknowledged messages constructed from
-     * {@link StreamDemarcator#nextToken()} operation.</li>
-     * </ul>
-     * The unacknowledged messages are determined from the value of
-     * {@link PublishingContext#getLastAckedMessageIndex()}.
-     * <br>
-     * This method assumes content stream affinity where it is expected that the
-     * content stream that represents the same Kafka message(s) will remain the
-     * same across possible retries. This is required specifically for cases
-     * where delimiter is used and a single content stream may represent
-     * multiple Kafka messages. The
-     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
-     * index of the last ACKed message, so upon retry only messages with the
-     * higher index are sent.
-     *
-     * @param publishingContext instance of {@link PublishingContext} which hold
-     * context information about the message(s) to be sent.
-     * @return The index of the last successful offset.
-     */
-    KafkaPublisherResult publish(PublishingContext publishingContext) {
-        StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
-                publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
-
-        int prevLastAckedMessageIndex = publishingContext.getLastAckedMessageIndex();
-        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
-
-        byte[] messageBytes;
-        int tokenCounter = 0;
-        boolean continueSending = true;
-        KafkaPublisherResult result = null;
-        for (; continueSending && (messageBytes = streamTokenizer.nextToken()) != null; tokenCounter++) {
-            if (prevLastAckedMessageIndex < tokenCounter) {
-                ProducerRecord<byte[], byte[]> message = new ProducerRecord<>(publishingContext.getTopic(), publishingContext.getKeyBytes(), messageBytes);
-                resultFutures.add(this.kafkaProducer.send(message));
-
-                if (tokenCounter % this.ackCheckSize == 0) {
-                    int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
-                    resultFutures.clear();
-                    if (lastAckedMessageIndex % this.ackCheckSize != 0) {
-                        continueSending = false;
-                        result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
-                    }
-                    prevLastAckedMessageIndex = lastAckedMessageIndex;
-                }
-            }
-        }
-
-        if (result == null) {
-            int lastAckedMessageIndex = this.processAcks(resultFutures, prevLastAckedMessageIndex);
-            resultFutures.clear();
-            result = new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
-        }
-        return result;
-    }
-
-    /**
-     * Sets the time this publisher will wait for the {@link Future#get()}
-     * operation (the Future returned by
-     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
-     * out.
-     *
-     * This value will also be used as a timeout when closing the underlying
-     * {@link KafkaProducer}. See {@link #close()}.
-     */
-    void setAckWaitTime(long ackWaitTime) {
-        this.ackWaitTime = ackWaitTime;
-    }
-
-    /**
-     * This operation will process ACKs from Kafka in the order in which
-     * {@link KafkaProducer#send(ProducerRecord)} invocation were made returning
-     * the index of the last ACKed message. Within this operation processing ACK
-     * simply means successful invocation of 'get()' operation on the
-     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
-     * operation. Upon encountering any type of error while interrogating such
-     * {@link Future} the ACK loop will end. Messages that were not ACKed would
-     * be considered non-delivered and therefore could be resent at the later
-     * time.
-     *
-     * @param sendFutures list of {@link Future}s representing results of
-     * publishing to Kafka
-     *
-     * @param lastAckMessageIndex the index of the last ACKed message. It is
-     * important to provide the last ACKed message especially while re-trying so
-     * the proper index is maintained.
-     */
-    private int processAcks(List<Future<RecordMetadata>> sendFutures, int lastAckMessageIndex) {
-        boolean exceptionThrown = false;
-        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && !exceptionThrown; segmentCounter++) {
-            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
-            try {
-                future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
-                lastAckMessageIndex++;
-            } catch (InterruptedException e) {
-                exceptionThrown = true;
-                Thread.currentThread().interrupt();
-                this.warnOrError("Interrupted while waiting for acks from Kafka", null);
-            } catch (ExecutionException e) {
-                exceptionThrown = true;
-                this.warnOrError("Failed while waiting for acks from Kafka", e);
-            } catch (TimeoutException e) {
-                exceptionThrown = true;
-                this.warnOrError("Timed out while waiting for acks from Kafka", null);
-            }
-        }
-
-        return lastAckMessageIndex;
-    }
-
-    /**
-     * Will close the underlying {@link KafkaProducer} waiting if necessary for
-     * the same duration as supplied {@link #setAckWaitTime(long)}
-     */
-    @Override
-    public void close() {
-        this.kafkaProducer.close(this.ackWaitTime, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     *
-     */
-    private void warnOrError(String message, Exception e) {
-        if (e == null) {
-            this.componentLog.warn(message);
-        } else {
-            this.componentLog.error(message, e);
-        }
-    }
-
-    /**
-     * Encapsulates the result received from publishing messages to Kafka
-     */
-    static class KafkaPublisherResult {
-
-        private final int messagesSent;
-        private final int lastMessageAcked;
-
-        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
-            this.messagesSent = messagesSent;
-            this.lastMessageAcked = lastMessageAcked;
-        }
-
-        public int getMessagesSent() {
-            return this.messagesSent;
-        }
-
-        public int getLastMessageAcked() {
-            return this.lastMessageAcked;
-        }
-
-        public boolean isAllAcked() {
-            return this.lastMessageAcked > -1 && this.messagesSent - 1 == this.lastMessageAcked;
-        }
-
-        @Override
-        public String toString() {
-            return "Sent:" + this.messagesSent + "; Last ACK:" + this.lastMessageAcked;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
index 4e1403d..53c64e6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.Closeable;
+import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
@@ -27,17 +28,16 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+
 import javax.xml.bind.DatatypeConverter;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
@@ -46,201 +46,192 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.9.x"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.9 producer. "
-        + "The messages to send may be individual FlowFiles or may be delimited, using a "
-        + "user-specified delimiter, such as a new-line. "
-        + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
-        + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the mean time"
-        + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
+    + "The messages to send may be individual FlowFiles or may be delimited, using a "
+    + "user-specified delimiter, such as a new-line. "
+    + " Please note there are cases where the publisher can get into an indefinite stuck state.  We are closely monitoring"
+    + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can.  In the mean time"
+    + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
-        description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+    description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
-public class PublishKafka extends AbstractSessionFactoryProcessor {
-
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    protected static final String FAILED_PROC_ID_ATTR = "failed.proc.id";
-
-    protected static final String FAILED_LAST_ACK_IDX = "failed.last.idx";
-
-    protected static final String FAILED_TOPIC_ATTR = "failed.topic";
-
-    protected static final String FAILED_KEY_ATTR = "failed.key";
-
-    protected static final String FAILED_DELIMITER_ATTR = "failed.delimiter";
-
+@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+    + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+    + "be greater than 1.")
+public class PublishKafka extends AbstractProcessor {
     protected static final String MSG_COUNT = "msg.count";
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
-            "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+        "FlowFile will be routed to failure unless the message is replicated to the appropriate "
             + "number of Kafka Nodes according to the Topic configuration");
     static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
-            "FlowFile will be routed to success if the message is received by a single Kafka node, "
+        "FlowFile will be routed to success if the message is received by a single Kafka node, "
             + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
             + "but can result in data loss if a Kafka node crashes");
     static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
-            "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+        "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
             + "without waiting for a response. This provides the best performance but may result in data loss.");
 
     static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
-            Partitioners.RoundRobinPartitioner.class.getSimpleName(),
-            "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+        Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+        "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
             + "the next Partition to Partition 2, and so on, wrapping as necessary.");
     static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
-            "DefaultPartitioner", "Messages will be assigned to random partitions.");
+        "DefaultPartitioner", "Messages will be assigned to random partitions.");
 
     static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
         "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
 
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
-            .name("topic")
-            .displayName("Topic Name")
-            .description("The name of the Kafka Topic to publish to.")
-            .required(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("topic")
+        .displayName("Topic Name")
+        .description("The name of the Kafka Topic to publish to.")
+        .required(true)
+        .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
 
     static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
-            .name(ProducerConfig.ACKS_CONFIG)
-            .displayName("Delivery Guarantee")
-            .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
-            .required(true)
-            .expressionLanguageSupported(false)
-            .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
-            .defaultValue(DELIVERY_BEST_EFFORT.getValue())
-            .build();
-
-    static final PropertyDescriptor META_WAIT_TIME = new PropertyDescriptor.Builder()
-            .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
-            .displayName("Meta Data Wait Time")
-            .description("The amount of time KafkaConsumer will wait to obtain metadata during the 'send' call before failing the "
-                    + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .defaultValue("30 sec")
-            .build();
+        .name(ProducerConfig.ACKS_CONFIG)
+        .displayName("Delivery Guarantee")
+        .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+        .required(true)
+        .expressionLanguageSupported(false)
+        .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+        .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+        .build();
+
+    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+        .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
+        .displayName("Max Metadata Wait Time")
+        .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+            + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
+        .required(true)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("5 sec")
+        .build();
+
+    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+        .name("ack.wait.time")
+        .displayName("Acknowledgment Wait Time")
+        .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+            + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .defaultValue("5 secs")
+        .build();
 
     static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
-            .name("max.request.size")
-            .displayName("Max Request Size")
-            .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
-            .required(true)
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .defaultValue("1 MB")
-            .build();
+        .name("max.request.size")
+        .displayName("Max Request Size")
+        .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
 
     static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
-            .name("kafka-key")
-            .displayName("Kafka Key")
-            .description("The Key to use for the Message. "
-                    + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
-                    + "and we're not demarcating.")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .build();
+        .name("kafka-key")
+        .displayName("Kafka Key")
+        .description("The Key to use for the Message. "
+            + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+            + "and we're not demarcating.")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .build();
 
     static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
-            .name("key-attribute-encoding")
-            .displayName("Key Attribute Encoding")
-            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
-            .required(true)
-            .defaultValue(UTF8_ENCODING.getValue())
-            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
-            .build();
+        .name("key-attribute-encoding")
+        .displayName("Key Attribute Encoding")
+        .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+        .required(true)
+        .defaultValue(UTF8_ENCODING.getValue())
+        .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+        .build();
 
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
-            .name("message-demarcator")
-            .displayName("Message Demarcator")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
-                    + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
-                    + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
-                    + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on your OS.")
-            .build();
+        .name("message-demarcator")
+        .displayName("Message Demarcator")
+        .required(false)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+            + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+            + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
+            + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.")
+        .build();
 
     static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
-            .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
-            .displayName("Partitioner class")
-            .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
-            .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
-            .defaultValue(RANDOM_PARTITIONING.getValue())
-            .required(false)
-            .build();
+        .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
+        .displayName("Partitioner class")
+        .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+        .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+        .defaultValue(RANDOM_PARTITIONING.getValue())
+        .required(false)
+        .build();
 
     static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
-            .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
-            .displayName("Compression Type")
-            .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .allowableValues("none", "gzip", "snappy", "lz4")
-            .defaultValue("none")
-            .build();
+        .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
+        .displayName("Compression Type")
+        .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .allowableValues("none", "gzip", "snappy", "lz4")
+        .defaultValue("none")
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("FlowFiles for which all content was sent to Kafka.")
-            .build();
+        .name("success")
+        .description("FlowFiles for which all content was sent to Kafka.")
+        .build();
 
     static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
-            .build();
-
-    static final List<PropertyDescriptor> DESCRIPTORS;
-
-    static final Set<Relationship> RELATIONSHIPS;
-
-    private volatile String brokers;
+        .name("failure")
+        .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+        .build();
 
-    private final AtomicInteger taskCounter = new AtomicInteger();
+    private static final List<PropertyDescriptor> PROPERTIES;
+    private static final Set<Relationship> RELATIONSHIPS;
 
-    private volatile boolean acceptTask = true;
+    private volatile PublisherPool publisherPool = null;
 
-    /*
-     * Will ensure that list of PropertyDescriptors is build only once, since
-     * all other lifecycle methods are invoked multiple times.
-     */
     static {
-        final List<PropertyDescriptor> _descriptors = new ArrayList<>();
-        _descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
-        _descriptors.add(TOPIC);
-        _descriptors.add(DELIVERY_GUARANTEE);
-        _descriptors.add(KEY);
-        _descriptors.add(KEY_ATTRIBUTE_ENCODING);
-        _descriptors.add(MESSAGE_DEMARCATOR);
-        _descriptors.add(MAX_REQUEST_SIZE);
-        _descriptors.add(META_WAIT_TIME);
-        _descriptors.add(PARTITION_CLASS);
-        _descriptors.add(COMPRESSION_CODEC);
-
-        DESCRIPTORS = Collections.unmodifiableList(_descriptors);
-
-        final Set<Relationship> _relationships = new HashSet<>();
-        _relationships.add(REL_SUCCESS);
-        _relationships.add(REL_FAILURE);
-        RELATIONSHIPS = Collections.unmodifiableSet(_relationships);
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        properties.add(TOPIC);
+        properties.add(DELIVERY_GUARANTEE);
+        properties.add(KEY);
+        properties.add(KEY_ATTRIBUTE_ENCODING);
+        properties.add(MESSAGE_DEMARCATOR);
+        properties.add(MAX_REQUEST_SIZE);
+        properties.add(ACK_WAIT_TIME);
+        properties.add(METADATA_WAIT_TIME);
+        properties.add(PARTITION_CLASS);
+        properties.add(COMPRESSION_CODEC);
+
+        PROPERTIES = Collections.unmodifiableList(properties);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        RELATIONSHIPS = Collections.unmodifiableSet(relationships);
     }
 
     @Override
@@ -250,15 +241,17 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return DESCRIPTORS;
+        return PROPERTIES;
     }
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
-                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
-                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class)).dynamic(true)
-                .build();
+            .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+            .name(propertyDescriptorName)
+            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .dynamic(true)
+            .build();
     }
 
     @Override
@@ -266,226 +259,123 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
         return KafkaProcessorUtils.validateCommonProperties(validationContext);
     }
 
-    volatile KafkaPublisher kafkaPublisher;
-
-    /**
-     * This thread-safe operation will delegate to
-     * {@link #rendezvousWithKafka(ProcessContext, ProcessSession)} after first
-     * checking and creating (if necessary) Kafka resource which could be either
-     * {@link KafkaPublisher} or {@link KafkaConsumer}. It will also close and
-     * destroy the underlying Kafka resource upon catching an {@link Exception}
-     * raised by {@link #rendezvousWithKafka(ProcessContext, ProcessSession)}.
-     * After Kafka resource is destroyed it will be re-created upon the next
-     * invocation of this operation essentially providing a self healing
-     * mechanism to deal with potentially corrupted resource.
-     * <p>
-     * Keep in mind that upon catching an exception the state of this processor
-     * will be set to no longer accept any more tasks, until Kafka resource is
-     * reset. This means that in a multi-threaded situation currently executing
-     * tasks will be given a chance to complete while no new tasks will be
-     * accepted.
-     *
-     * @param context context
-     * @param sessionFactory factory
-     */
-    @Override
-    public final void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
-        if (this.acceptTask) { // acts as a circuit breaker to allow existing tasks to wind down so 'kafkaPublisher' can be reset before new tasks are accepted.
-            this.taskCounter.incrementAndGet();
-            final ProcessSession session = sessionFactory.createSession();
-            try {
-                /*
-                 * We can't be doing double null check here since as a pattern
-                 * it only works for lazy init but not reset, which is what we
-                 * are doing here. In fact the first null check is dangerous
-                 * since 'kafkaPublisher' can become null right after its null
-                 * check passed causing subsequent NPE.
-                 */
-                synchronized (this) {
-                    if (this.kafkaPublisher == null) {
-                        this.kafkaPublisher = this.buildKafkaResource(context, session);
-                    }
-                }
-
-                /*
-                 * The 'processed' boolean flag does not imply any failure or success. It simply states that:
-                 * - ConsumeKafka - some messages were received form Kafka and 1_ FlowFile were generated
-                 * - PublishKafka0_10 - some messages were sent to Kafka based on existence of the input FlowFile
-                 */
-                boolean processed = this.rendezvousWithKafka(context, session);
-                session.commit();
-                if (!processed) {
-                    context.yield();
-                }
-            } catch (Throwable e) {
-                this.acceptTask = false;
-                session.rollback(true);
-                this.getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, e});
-            } finally {
-                synchronized (this) {
-                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
-                        this.close();
-                        this.acceptTask = true;
-                    }
-                }
-            }
-        } else {
-            this.logger.debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
-            this.getLogger().debug("Task was not accepted due to the processor being in 'reset' state. It will be re-submitted upon completion of the reset.");
-            context.yield();
+    private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
+        PublisherPool pool = publisherPool;
+        if (pool != null) {
+            return pool;
         }
+
+        return publisherPool = createPublisherPool(context);
+    }
+
+    protected PublisherPool createPublisherPool(final ProcessContext context) {
+        final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
+
+        return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis);
     }
 
-    /**
-     * Will call {@link Closeable#close()} on the target resource after which
-     * the target resource will be set to null. Should only be called when there
-     * are no more threads being executed on this processor or when it has been
-     * verified that only a single thread remains.
-     *
-     * @see KafkaPublisher
-     * @see KafkaConsumer
-     */
     @OnStopped
-    public void close() {
-        try {
-            if (this.kafkaPublisher != null) {
-                try {
-                    this.kafkaPublisher.close();
-                } catch (Exception e) {
-                    this.getLogger().warn("Failed while closing " + this.kafkaPublisher, e);
-                }
-            }
-        } finally {
-            this.kafkaPublisher = null;
+    public void closePool() {
+        if (publisherPool != null) {
+            publisherPool.close();
         }
+
+        publisherPool = null;
     }
 
-    /**
-     * Will rendezvous with Kafka if {@link ProcessSession} contains
-     * {@link FlowFile} producing a result {@link FlowFile}.
-     * <br>
-     * The result {@link FlowFile} that is successful is then transfered to
-     * {@link #REL_SUCCESS}
-     * <br>
-     * The result {@link FlowFile} that is failed is then transfered to
-     * {@link #REL_FAILURE}
-     *
-     */
-    protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) {
-        FlowFile flowFile = session.get();
-        if (flowFile != null) {
-            long start = System.nanoTime();
-            flowFile = this.doRendezvousWithKafka(flowFile, context, session);
-            Relationship relationship = REL_SUCCESS;
-            if (!this.isFailedFlowFile(flowFile)) {
-                String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-                long executionDuration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                String transitUri = KafkaProcessorUtils.buildTransitURI(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(), this.brokers, topic);
-                session.getProvenanceReporter().send(flowFile, transitUri, "Sent " + flowFile.getAttribute(MSG_COUNT) + " Kafka messages", executionDuration);
-                this.getLogger().debug("Successfully sent {} to Kafka as {} message(s) in {} millis",
-                        new Object[]{flowFile, flowFile.getAttribute(MSG_COUNT), executionDuration});
-            } else {
-                relationship = REL_FAILURE;
-                flowFile = session.penalize(flowFile);
-            }
-            session.transfer(flowFile, relationship);
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet();
+
+        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
+        if (flowFiles.isEmpty()) {
+            return;
         }
-        return flowFile != null;
-    }
 
-    /**
-     * Builds and instance of {@link KafkaPublisher}.
-     */
-    protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) {
-        final Map<String, String> kafkaProps = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProps);
-        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProps.put("max.request.size", String.valueOf(context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue()));
-        this.brokers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
-        final Properties props = new Properties();
-        props.putAll(kafkaProps);
-        KafkaPublisher publisher = new KafkaPublisher(props, this.getLogger());
-        return publisher;
-    }
+        final PublisherPool pool = getPublisherPool(context);
+        if (pool == null) {
+            context.yield();
+            return;
+        }
 
-    /**
-     * Will rendezvous with {@link KafkaPublisher} after building
-     * {@link PublishingContext} and will produce the resulting
-     * {@link FlowFile}. The resulting FlowFile contains all required
-     * information to determine if message publishing originated from the
-     * provided FlowFile has actually succeeded fully, partially or failed
-     * completely (see {@link #isFailedFlowFile(FlowFile)}.
-     */
-    private FlowFile doRendezvousWithKafka(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
-        final AtomicReference<KafkaPublisher.KafkaPublisherResult> publishResultRef = new AtomicReference<>();
-        session.read(flowFile, new InputStreamCallback() {
-            @Override
-            public void process(InputStream contentStream) throws IOException {
-                PublishingContext publishingContext = PublishKafka.this.buildPublishingContext(flowFile, context, contentStream);
-                KafkaPublisher.KafkaPublisherResult result = PublishKafka.this.kafkaPublisher.publish(publishingContext);
-                publishResultRef.set(result);
+        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+
+        final long startTime = System.nanoTime();
+        try (final PublisherLease lease = pool.obtainPublisher()) {
+            // Send each FlowFile to Kafka asynchronously.
+            for (final FlowFile flowFile : flowFiles) {
+                if (!isScheduled()) {
+                    // If stopped, re-queue FlowFile instead of sending it
+                    session.transfer(flowFile);
+                    continue;
+                }
+
+                final byte[] messageKey = getMessageKey(flowFile, context);
+                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                final byte[] demarcatorBytes;
+                if (useDemarcator) {
+                    demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+                } else {
+                    demarcatorBytes = null;
+                }
+
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream rawIn) throws IOException {
+                        try (final InputStream in = new BufferedInputStream(rawIn)) {
+                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
+                        }
+                    }
+                });
             }
-        });
 
-        FlowFile resultFile = publishResultRef.get().isAllAcked()
-                ? this.cleanUpFlowFileIfNecessary(flowFile, session)
-                : session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(publishResultRef.get().getLastMessageAcked(), flowFile, context));
+            // Complete the send
+            final PublishResult publishResult = lease.complete();
 
-        if (!this.isFailedFlowFile(resultFile)) {
-            resultFile = session.putAttribute(resultFile, MSG_COUNT, String.valueOf(publishResultRef.get().getMessagesSent()));
-        }
-        return resultFile;
-    }
+            // Transfer any successful FlowFiles.
+            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+            for (FlowFile success : publishResult.getSuccessfulFlowFiles()) {
+                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
 
-    /**
-     * Builds {@link PublishingContext} for message(s) to be sent to Kafka.
-     * {@link PublishingContext} contains all contextual information required by
-     * {@link KafkaPublisher} to publish to Kafka. Such information contains
-     * things like topic name, content stream, delimiter, key and last ACKed
-     * message for cases where provided FlowFile is being retried (failed in the
-     * past).
-     * <br>
-     * For the clean FlowFile (file that has been sent for the first time),
-     * PublishingContext will be built form {@link ProcessContext} associated
-     * with this invocation.
-     * <br>
-     * For the failed FlowFile, {@link PublishingContext} will be built from
-     * attributes of that FlowFile which by then will already contain required
-     * information (e.g., topic, key, delimiter etc.). This is required to
-     * ensure the affinity of the retry in the even where processor
-     * configuration has changed. However keep in mind that failed FlowFile is
-     * only considered a failed FlowFile if it is being re-processed by the same
-     * processor (determined via {@link #FAILED_PROC_ID_ATTR}, see
-     * {@link #isFailedFlowFile(FlowFile)}). If failed FlowFile is being sent to
-     * another PublishKafka0_10 processor it is treated as a fresh FlowFile
-     * regardless if it has #FAILED* attributes set.
-     */
-    private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
-        final byte[] keyBytes = getMessageKey(flowFile, context);
-
-        final String topicName;
-        final byte[] delimiterBytes;
-        int lastAckedMessageIndex = -1;
-        if (this.isFailedFlowFile(flowFile)) {
-            lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
-            topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
-            delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
-                    ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
-        } else {
-            topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-            delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
-                    .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
-        }
+                final int msgCount = publishResult.getSuccessfulMessageCount(success);
+                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+                session.adjustCounter("Messages Sent", msgCount, true);
+
+                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+                session.transfer(success, REL_SUCCESS);
+            }
+
+            // Transfer any failures.
+            for (final FlowFile failure : publishResult.getFailedFlowFiles()) {
+                final int successCount = publishResult.getSuccessfulMessageCount(failure);
+                if (successCount > 0) {
+                    getLogger().error("Failed to send some messages for {} to Kafka, but {} messages were acknowledged by Kafka. Routing to failure due to {}",
+                        new Object[] {failure, successCount, publishResult.getReasonForFailure(failure)});
+                } else {
+                    getLogger().error("Failed to send all message for {} to Kafka; routing to failure due to {}",
+                        new Object[] {failure, publishResult.getReasonForFailure(failure)});
+                }
 
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex,
-                context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue());
-        publishingContext.setKeyBytes(keyBytes);
-        publishingContext.setDelimiterBytes(delimiterBytes);
-        return publishingContext;
+                session.transfer(failure, REL_FAILURE);
+            }
+        }
     }
 
+
     private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
+        if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+            return null;
+        }
+
         final String uninterpretedKey;
         if (context.getProperty(KEY).isSet()) {
             uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
@@ -504,51 +394,4 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
 
         return DatatypeConverter.parseHexBinary(uninterpretedKey);
     }
-
-    /**
-     * Will remove FAILED_* attributes if FlowFile is no longer considered a
-     * failed FlowFile
-     *
-     * @see #isFailedFlowFile(FlowFile)
-     */
-    private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) {
-        if (this.isFailedFlowFile(flowFile)) {
-            Set<String> keysToRemove = new HashSet<>();
-            keysToRemove.add(FAILED_DELIMITER_ATTR);
-            keysToRemove.add(FAILED_KEY_ATTR);
-            keysToRemove.add(FAILED_TOPIC_ATTR);
-            keysToRemove.add(FAILED_PROC_ID_ATTR);
-            keysToRemove.add(FAILED_LAST_ACK_IDX);
-            flowFile = session.removeAllAttributes(flowFile, keysToRemove);
-        }
-        return flowFile;
-    }
-
-    /**
-     * Builds a {@link Map} of FAILED_* attributes
-     *
-     * @see #FAILED_PROC_ID_ATTR
-     * @see #FAILED_LAST_ACK_IDX
-     * @see #FAILED_TOPIC_ATTR
-     * @see #FAILED_KEY_ATTR
-     * @see #FAILED_DELIMITER_ATTR
-     */
-    private Map<String, String> buildFailedFlowFileAttributes(int lastAckedMessageIndex, FlowFile sourceFlowFile, ProcessContext context) {
-        Map<String, String> attributes = new HashMap<>();
-        attributes.put(FAILED_PROC_ID_ATTR, this.getIdentifier());
-        attributes.put(FAILED_LAST_ACK_IDX, String.valueOf(lastAckedMessageIndex));
-        attributes.put(FAILED_TOPIC_ATTR, context.getProperty(TOPIC).evaluateAttributeExpressions(sourceFlowFile).getValue());
-        attributes.put(FAILED_KEY_ATTR, context.getProperty(KEY).evaluateAttributeExpressions(sourceFlowFile).getValue());
-        attributes.put(FAILED_DELIMITER_ATTR, context.getProperty(MESSAGE_DEMARCATOR).isSet()
-                ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(sourceFlowFile).getValue() : null);
-        return attributes;
-    }
-
-    /**
-     * Returns 'true' if provided FlowFile is a failed FlowFile. A failed
-     * FlowFile contains {@link #FAILED_PROC_ID_ATTR}.
-     */
-    private boolean isFailedFlowFile(FlowFile flowFile) {
-        return this.getIdentifier().equals(flowFile.getAttribute(FAILED_PROC_ID_ATTR));
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
new file mode 100644
index 0000000..b685265
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface PublishResult {
+    Collection<FlowFile> getSuccessfulFlowFiles();
+
+    Collection<FlowFile> getFailedFlowFiles();
+
+    int getSuccessfulMessageCount(FlowFile flowFile);
+
+    Exception getReasonForFailure(FlowFile flowFile);
+
+
+    public static final PublishResult EMPTY = new PublishResult() {
+        @Override
+        public Collection<FlowFile> getSuccessfulFlowFiles() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public Collection<FlowFile> getFailedFlowFiles() {
+            return Collections.emptyList();
+        }
+
+        @Override
+        public int getSuccessfulMessageCount(FlowFile flowFile) {
+            return 0;
+        }
+
+        @Override
+        public Exception getReasonForFailure(FlowFile flowFile) {
+            return null;
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
new file mode 100644
index 0000000..b67e8a8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+public class PublisherLease implements Closeable {
+    private final ComponentLog logger;
+    private final Producer<byte[], byte[]> producer;
+    private final int maxMessageSize;
+    private final long maxAckWaitMillis;
+    private volatile boolean poisoned = false;
+
+    private InFlightMessageTracker tracker;
+
+    public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger) {
+        this.producer = producer;
+        this.maxMessageSize = maxMessageSize;
+        this.logger = logger;
+        this.maxAckWaitMillis = maxAckWaitMillis;
+    }
+
+    protected void poison() {
+        this.poisoned = true;
+    }
+
+    public boolean isPoisoned() {
+        return poisoned;
+    }
+
+    void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException {
+        if (tracker == null) {
+            tracker = new InFlightMessageTracker();
+        }
+
+        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+            byte[] messageContent;
+            try {
+                while ((messageContent = demarcator.nextToken()) != null) {
+                    // We do not want to use any key if we have a demarcator because that would result in
+                    // the key being the same for multiple messages
+                    final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
+                    publish(flowFile, keyToUse, messageContent, topic, tracker);
+
+                    if (tracker.isFailed(flowFile)) {
+                        // If we have a failure, don't try to send anything else.
+                        return;
+                    }
+                }
+            } catch (final TokenTooLargeException ttle) {
+                tracker.fail(flowFile, ttle);
+            }
+        } catch (final Exception e) {
+            tracker.fail(flowFile, e);
+            poison();
+            throw e;
+        }
+    }
+
+    private void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+        final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
+        producer.send(record, new Callback() {
+            @Override
+            public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+                if (exception == null) {
+                    tracker.incrementAcknowledgedCount(flowFile);
+                } else {
+                    tracker.fail(flowFile, exception);
+                    poison();
+                }
+            }
+        });
+
+        tracker.incrementSentCount(flowFile);
+    }
+
+    public PublishResult complete() {
+        if (tracker == null) {
+            throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
+        }
+
+        producer.flush();
+
+        try {
+            tracker.awaitCompletion(maxAckWaitMillis);
+            return tracker.createPublishResult();
+        } catch (final InterruptedException e) {
+            logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
+            Thread.currentThread().interrupt();
+            return tracker.failOutstanding(e);
+        } catch (final TimeoutException e) {
+            logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
+            return tracker.failOutstanding(e);
+        } finally {
+            tracker = null;
+        }
+    }
+
+    @Override
+    public void close() {
+        producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
+        tracker = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
new file mode 100644
index 0000000..5902b03
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
+public class PublisherPool implements Closeable {
+    private final ComponentLog logger;
+    private final BlockingQueue<PublisherLease> publisherQueue;
+    private final Map<String, Object> kafkaProperties;
+    private final int maxMessageSize;
+    private final long maxAckWaitMillis;
+
+    private volatile boolean closed = false;
+
+    PublisherPool(final Map<String, Object> kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis) {
+        this.logger = logger;
+        this.publisherQueue = new LinkedBlockingQueue<>();
+        this.kafkaProperties = kafkaProperties;
+        this.maxMessageSize = maxMessageSize;
+        this.maxAckWaitMillis = maxAckWaitMillis;
+    }
+
+    public PublisherLease obtainPublisher() {
+        if (isClosed()) {
+            throw new IllegalStateException("Connection Pool is closed");
+        }
+
+        PublisherLease lease = publisherQueue.poll();
+        if (lease != null) {
+            return lease;
+        }
+
+        lease = createLease();
+        return lease;
+    }
+
+    private PublisherLease createLease() {
+        final Producer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProperties);
+        final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger) {
+            @Override
+            public void close() {
+                if (isPoisoned() || isClosed()) {
+                    super.close();
+                } else {
+                    publisherQueue.offer(this);
+                }
+            }
+        };
+
+        return lease;
+    }
+
+    public synchronized boolean isClosed() {
+        return closed;
+    }
+
+    @Override
+    public synchronized void close() {
+        closed = true;
+
+        PublisherLease lease;
+        while ((lease = publisherQueue.poll()) != null) {
+            lease.close();
+        }
+    }
+
+    /**
+     * Returns the number of leases that are currently available
+     *
+     * @return the number of leases currently available
+     */
+    protected int available() {
+        return publisherQueue.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
deleted file mode 100644
index 1513481..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishingContext.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-/**
- * Holder of context information used by {@link KafkaPublisher} required to
- * publish messages to Kafka.
- */
-class PublishingContext {
-
-    private final InputStream contentStream;
-
-    private final String topic;
-
-    private final int lastAckedMessageIndex;
-
-    private final int maxRequestSize;
-
-    private byte[] keyBytes;
-
-    private byte[] delimiterBytes;
-
-    PublishingContext(InputStream contentStream, String topic) {
-        this(contentStream, topic, -1);
-    }
-
-    PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex) {
-        this(contentStream, topic, lastAckedMessageIndex, 1048576);
-    }
-
-    PublishingContext(InputStream contentStream, String topic, int lastAckedMessageIndex, int maxRequestSize) {
-        this.validateInput(contentStream, topic, lastAckedMessageIndex);
-        this.contentStream = contentStream;
-        this.topic = topic;
-        this.lastAckedMessageIndex = lastAckedMessageIndex;
-        this.maxRequestSize = maxRequestSize;
-    }
-
-    @Override
-    public String toString() {
-        return "topic: '" + this.topic + "'; delimiter: '" + new String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
-    }
-
-    int getLastAckedMessageIndex() {
-        return this.lastAckedMessageIndex;
-    }
-
-    int getMaxRequestSize() {
-        return this.maxRequestSize;
-    }
-
-    byte[] getKeyBytes() {
-        return this.keyBytes;
-    }
-
-    byte[] getDelimiterBytes() {
-        return this.delimiterBytes;
-    }
-
-    InputStream getContentStream() {
-        return this.contentStream;
-    }
-
-    String getTopic() {
-        return this.topic;
-    }
-
-    void setKeyBytes(byte[] keyBytes) {
-        if (this.keyBytes == null) {
-            if (keyBytes != null) {
-                this.assertBytesValid(keyBytes);
-                this.keyBytes = keyBytes;
-            }
-        } else {
-            throw new IllegalArgumentException("'keyBytes' can only be set once per instance");
-        }
-    }
-
-    void setDelimiterBytes(byte[] delimiterBytes) {
-        if (this.delimiterBytes == null) {
-            if (delimiterBytes != null) {
-                this.assertBytesValid(delimiterBytes);
-                this.delimiterBytes = delimiterBytes;
-            }
-        } else {
-            throw new IllegalArgumentException("'delimiterBytes' can only be set once per instance");
-        }
-    }
-
-    private void assertBytesValid(byte[] bytes) {
-        if (bytes != null) {
-            if (bytes.length == 0) {
-                throw new IllegalArgumentException("'bytes' must not be empty");
-            }
-        }
-    }
-
-    private void validateInput(InputStream contentStream, String topic, int lastAckedMessageIndex) {
-        if (contentStream == null) {
-            throw new IllegalArgumentException("'contentStream' must not be null");
-        } else if (topic == null || topic.trim().length() == 0) {
-            throw new IllegalArgumentException("'topic' must not be null or empty");
-        } else if (lastAckedMessageIndex < -1) {
-            throw new IllegalArgumentException("'lastAckedMessageIndex' must be >= -1");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 28b8393..2a35f79 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.kafka.pubsub.PublishKafka
-org.apache.nifi.processors.kafka.pubsub.ConsumeKafka
\ No newline at end of file
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafka

http://git-wip-us.apache.org/repos/asf/nifi/blob/92cca96d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
deleted file mode 100644
index 19c64af..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.kafka.clients.producer.ProducerConfig;
-
-public class KafkaPublisherTest {
-
-    private static EmbeddedKafka kafkaLocal;
-
-    private static EmbeddedKafkaProducerHelper producerHelper;
-
-    @BeforeClass
-    public static void beforeClass() {
-        kafkaLocal = new EmbeddedKafka();
-        kafkaLocal.start();
-        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        producerHelper.close();
-        kafkaLocal.stop();
-    }
-
-    @Test
-    public void validateSuccessfulSendAsWhole() throws Exception {
-        InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateSuccessfulSendAsWhole";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-        KafkaPublisherResult result = publisher.publish(publishingContext);
-
-        assertEquals(0, result.getLastMessageAcked());
-        assertEquals(1, result.getMessagesSent());
-        contentStream.close();
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        assertNotNull(iter.next());
-        try {
-            iter.next();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-    }
-
-    @Test
-    public void validateSuccessfulSendAsDelimited() throws Exception {
-        InputStream contentStream = new ByteArrayInputStream(
-                "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateSuccessfulSendAsDelimited";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        KafkaPublisherResult result = publisher.publish(publishingContext);
-
-        assertEquals(3, result.getLastMessageAcked());
-        assertEquals(4, result.getMessagesSent());
-        contentStream.close();
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-    }
-
-    /*
-     * This test simulates the condition where not all messages were ACKed by
-     * Kafka
-     */
-    @Test
-    public void validateRetries() throws Exception {
-        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
-        InputStream contentStream = new ByteArrayInputStream(testValue);
-        String topicName = "validateSuccessfulReSendOfFailedSegments";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        // simulates the first re-try
-        int lastAckedMessageIndex = 1;
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
-        publisher.publish(publishingContext);
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String m1 = new String(iter.next().message());
-        String m2 = new String(iter.next().message());
-        assertEquals("Hello Kafka3", m1);
-        assertEquals("Hello Kafka4", m2);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        // simulates the second re-try
-        lastAckedMessageIndex = 2;
-        contentStream = new ByteArrayInputStream(testValue);
-        publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(publishingContext);
-
-        m1 = new String(iter.next().message());
-        assertEquals("Hello Kafka4", m1);
-
-        publisher.close();
-    }
-
-    /*
-     * Similar to the above test, but it sets the first retry index to the last
-     * possible message index and second index to an out of bound index. The
-     * expectation is that no messages will be sent to Kafka
-     */
-    @Test
-    public void validateRetriesWithWrongIndex() throws Exception {
-        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
-        InputStream contentStream = new ByteArrayInputStream(testValue);
-        String topicName = "validateRetriesWithWrongIndex";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
-        // simulates the first re-try
-        int lastAckedMessageIndex = 3;
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
-        publisher.publish(publishingContext);
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        // simulates the second re-try
-        lastAckedMessageIndex = 6;
-        contentStream = new ByteArrayInputStream(testValue);
-        publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(publishingContext);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        publisher.close();
-    }
-
-    @Test
-    public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
-        String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
-        InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateWithMultiByteCharacters";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-
-        publisher.publish(publishingContext);
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String r = new String(iter.next().message(), StandardCharsets.UTF_8);
-        assertEquals(data, r);
-    }
-
-    @Test
-    public void validateWithNonDefaultPartitioner() throws Exception {
-        String data = "fooandbarandbaz";
-        InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateWithNonDefaultPartitioner";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName());
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-        PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-        publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
-
-        try {
-            publisher.publish(publishingContext);
-            // partitioner should be invoked 3 times
-            assertTrue(TestPartitioner.counter == 3);
-            publisher.close();
-        } finally {
-            TestPartitioner.counter = 0;
-        }
-    }
-
-    private Properties buildProducerProperties() {
-        Properties kafkaProperties = new Properties();
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
-        kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaLocal.getKafkaPort());
-        kafkaProperties.put("auto.create.topics.enable", "true");
-        return kafkaProperties;
-    }
-
-    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
-        Properties props = new Properties();
-        props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort());
-        props.put("group.id", "test");
-        props.put("consumer.timeout.ms", "500");
-        props.put("auto.offset.reset", "smallest");
-        ConsumerConfig consumerConfig = new ConsumerConfig(props);
-        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
-        Map<String, Integer> topicCountMap = new HashMap<>(1);
-        topicCountMap.put(topic, 1);
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
-        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
-        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
-        return iter;
-    }
-
-    public static class TestPartitioner implements Partitioner {
-
-        static int counter;
-
-        @Override
-        public void configure(Map<String, ?> configs) {
-            // nothing to do, test
-        }
-
-        @Override
-        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
-                Cluster cluster) {
-            counter++;
-            return 0;
-        }
-
-        @Override
-        public void close() {
-            counter = 0;
-        }
-    }
-}