You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/09/19 13:50:26 UTC

[2/5] nifi git commit: NIFI-5518: Added processors for integrating with Apache Kafka 2.0

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html
new file mode 100644
index 0000000..dd89164
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html
@@ -0,0 +1,144 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PublishKafkaRecord</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <h2>Description</h2>
+        <p>
+            This Processor puts the contents of a FlowFile to a Topic in
+            <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
+            with Kafka 2.0 API. The contents of the incoming FlowFile will be read using the
+            configured Record Reader. Each record will then be serialized using the configured
+            Record Writer, and this serialized form will be the content of a Kafka message.
+            This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
+        </p>
+
+
+        <h2>Security Configuration</h2>
+        <p>
+            The Security Protocol property allows the user to specify the protocol for communicating
+            with the Kafka broker. The following sections describe each of the protocols in further detail.
+        </p>
+        <h3>PLAINTEXT</h3>
+        <p>
+            This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+            In order to use this option the broker must be configured with a listener of the form:
+            <pre>
+    PLAINTEXT://host.name:port
+            </pre>
+        </p>
+        <h3>SSL</h3>
+        <p>
+            This option provides an encrypted connection to the broker, with optional client authentication. In order
+            to use this option the broker must be configured with a listener of the form:
+            <pre>
+    SSL://host.name:port
+            </pre>
+            In addition, the processor must have an SSL Context Service selected.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+            not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+            a truststore containing the public key of the certificate authority used to sign the broker's key.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+            In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+            a truststore as described above.
+        </p>
+        <h3>SASL_PLAINTEXT</h3>
+        <p>
+            This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_PLAINTEXT://host.name:port
+            </pre>
+            In addition, the Kerberos Service Name must be specified in the processor.
+        </p>
+        <h4>SASL_PLAINTEXT - GSSAPI</h4>
+        <p>
+            If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+            JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+            NiFi's bootstrap.conf, such as:
+            <pre>
+    java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+            </pre>
+        </p>
+        <p>
+            An example of the JAAS config file would be the following:
+            <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/path/to/nifi.keytab"
+        serviceName="kafka"
+        principal="nifi@YOURREALM.COM";
+    };
+            </pre>
+        <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+        </p>
+        <p>
+            Alternatively, the JAAS
+            configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+            directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+            will take precedence over the java.security.auth.login.config system property.
+        </p>
+        <h4>SASL_PLAINTEXT - PLAIN</h4>
+        <p>
+            If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+            the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+            be the following:
+            <pre>
+    KafkaClient {
+      org.apache.kafka.common.security.plain.PlainLoginModule required
+      username="nifi"
+      password="nifi-password";
+    };
+            </pre>
+        </p>
+        <p>
+            <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+            the username and password unencrypted.
+        </p>
+        <p>
+            <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+            it visible to components in other NARs that may access the providers. There is currently a known issue
+            where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+        </p>
+        <h3>SASL_SSL</h3>
+        <p>
+            This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_SSL://host.name:port
+            </pre>
+        </p>
+        <p>
+            See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+            depending on the SASL mechanism (GSSAPI or PLAIN).
+        </p>
+        <p>
+            See the SSL section for a description of how to configure the SSL Context Service based on the
+            ssl.client.auth property.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html
new file mode 100644
index 0000000..1d26464
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html
@@ -0,0 +1,156 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PublishKafka</title>
+        <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <h2>Description</h2>
+        <p>
+            This Processor puts the contents of a FlowFile to a Topic in
+            <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
+            with Kafka 2.0 API. The content of a FlowFile becomes the contents of a Kafka message.
+            This message is optionally assigned a key by using the &lt;Kafka Key&gt; Property.
+        </p>
+
+        <p>
+            The Processor allows the user to configure an optional Message Demarcator that
+            can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used
+            to indicate that the contents of the FlowFile should be used to send one message
+            per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator').
+            If the property is not set, the entire contents of the FlowFile
+            will be sent as a single message. When using the demarcator, if some messages are
+            successfully sent but other messages fail to send, the resulting FlowFile will be
+            considered a failed FlowFile and will have additional attributes to that effect.
+            One of such attributes is 'failed.last.idx' which indicates the index of the last message
+            that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1).
+            This will allow PublishKafka to only re-send un-ACKed messages on the next re-try.
+        </p>
+
+
+        <h2>Security Configuration</h2>
+        <p>
+            The Security Protocol property allows the user to specify the protocol for communicating
+            with the Kafka broker. The following sections describe each of the protocols in further detail.
+        </p>
+        <h3>PLAINTEXT</h3>
+        <p>
+            This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+            In order to use this option the broker must be configured with a listener of the form:
+            <pre>
+    PLAINTEXT://host.name:port
+            </pre>
+        </p>
+        <h3>SSL</h3>
+        <p>
+            This option provides an encrypted connection to the broker, with optional client authentication. In order
+            to use this option the broker must be configured with a listener of the form:
+            <pre>
+    SSL://host.name:port
+            </pre>
+            In addition, the processor must have an SSL Context Service selected.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+            not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+            a truststore containing the public key of the certificate authority used to sign the broker's key.
+        </p>
+        <p>
+            If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+            In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+            a truststore as described above.
+        </p>
+        <h3>SASL_PLAINTEXT</h3>
+        <p>
+            This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_PLAINTEXT://host.name:port
+            </pre>
+            In addition, the Kerberos Service Name must be specified in the processor.
+        </p>
+        <h4>SASL_PLAINTEXT - GSSAPI</h4>
+        <p>
+            If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+            JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+            NiFi's bootstrap.conf, such as:
+            <pre>
+    java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+            </pre>
+        </p>
+        <p>
+            An example of the JAAS config file would be the following:
+            <pre>
+    KafkaClient {
+        com.sun.security.auth.module.Krb5LoginModule required
+        useKeyTab=true
+        storeKey=true
+        keyTab="/path/to/nifi.keytab"
+        serviceName="kafka"
+        principal="nifi@YOURREALM.COM";
+    };
+            </pre>
+        <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+        </p>
+        <p>
+            Alternatively, the JAAS
+            configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+            directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+            will take precedence over the java.security.auth.login.config system property.
+        </p>
+        <h4>SASL_PLAINTEXT - PLAIN</h4>
+        <p>
+            If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+            the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+            be the following:
+            <pre>
+    KafkaClient {
+      org.apache.kafka.common.security.plain.PlainLoginModule required
+      username="nifi"
+      password="nifi-password";
+    };
+            </pre>
+        </p>
+        <p>
+            <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+            the username and password unencrypted.
+        </p>
+        <p>
+            <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+            it visible to components in other NARs that may access the providers. There is currently a known issue
+            where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+        </p>
+        <h3>SASL_SSL</h3>
+        <p>
+            This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+            option the broker must be configured with a listener of the form:
+            <pre>
+    SASL_SSL://host.name:port
+            </pre>
+        </p>
+        <p>
+            See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+            depending on the SASL mechanism (GSSAPI or PLAIN).
+        </p>
+        <p>
+            See the SSL section for a description of how to configure the SSL Context Service based on the
+            ssl.client.auth property.
+        </p>
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
new file mode 100644
index 0000000..810c235
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerPoolTest {
+
+    private Consumer<byte[], byte[]> consumer = null;
+    private ProcessSession mockSession = null;
+    private ProcessContext mockContext = Mockito.mock(ProcessContext.class);
+    private ProvenanceReporter mockReporter = null;
+    private ConsumerPool testPool = null;
+    private ConsumerPool testDemarcatedPool = null;
+    private ComponentLog logger = null;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setup() {
+        consumer = mock(Consumer.class);
+        logger = mock(ComponentLog.class);
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger,
+                true,
+                StandardCharsets.UTF_8,
+                null) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger,
+                true,
+                StandardCharsets.UTF_8,
+                Pattern.compile(".*")) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
+
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
+
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+        }
+        testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(4, stats.leasesObtainedCount);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        for (int i = 0; i < 100; i++) {
+            try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+                for (int j = 0; j < 100; j++) {
+                    lease.poll();
+                }
+            }
+        }
+        testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(100, stats.leasesObtainedCount);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
+
+            }
+        }
+        testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
new file mode 100644
index 0000000..399d426
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class ITConsumeKafka_2_0 {
+
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_2_0 proc = new ConsumeKafka_2_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_2_0 proc = new ConsumeKafka_2_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_2_0.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafka_2_0.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        ConsumeKafka_2_0 proc = new ConsumeKafka_2_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
new file mode 100644
index 0000000..9b05188
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestConsumeKafkaRecord_2_0 {
+
+    private ConsumerLease mockLease = null;
+    private ConsumerPool mockConsumerPool = null;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+
+        ConsumeKafkaRecord_2_0 proc = new ConsumeKafkaRecord_2_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+
+        final String readerId = "record-reader";
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService(readerId, readerService);
+        runner.enableControllerService(readerService);
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
+        runner.addControllerService(writerId, writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConsumeKafkaRecord_2_0.RECORD_READER, readerId);
+        runner.setProperty(ConsumeKafkaRecord_2_0.RECORD_WRITER, writerId);
+    }
+
+    @Test
+    public void validateCustomValidatorSettings() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
+        runner.assertNotValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafkaRecord_2_0.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because Group ID is required"));
+        }
+
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void testJaasConfiguration() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
+        runner.assertValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.assertValid();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
new file mode 100644
index 0000000..297772e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class TestConsumeKafka_2_0 {
+
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateCustomValidatorSettings() throws Exception {
+        ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
+        runner.assertNotValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafka_2_0.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because Group ID is required"));
+        }
+
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+        }
+    }
+
+    @Test
+    public void testJaasConfiguration() throws Exception {
+        ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
+        runner.assertValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.assertValid();
+
+        runner.setVariable("keytab", "src/test/resources/server.properties");
+        runner.setVariable("principal", "nifi@APACHE.COM");
+        runner.setVariable("service", "kafka");
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
+        runner.assertValid();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
new file mode 100644
index 0000000..f95c3c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.processors.kafka.pubsub.InFlightMessageTracker;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+public class TestInFlightMessageTracker {
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker(new MockComponentLog("1", "unit-test"));
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.awaitCompletion(1L);
+    }
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker(new MockComponentLog("1", "unit-test"));
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        final ExecutorService exec = Executors.newFixedThreadPool(1);
+        final Future<?> future = exec.submit(() -> {
+            try {
+                tracker.awaitCompletion(10000L);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.incrementAcknowledgedCount(flowFile);
+
+        future.get();
+    }
+
+    private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
+        try {
+            tracker.awaitCompletion(10L);
+            Assert.fail("Expected timeout");
+        } catch (final TimeoutException te) {
+            // expected
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
new file mode 100644
index 0000000..39a135d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestPublishKafkaRecord_2_0 {
+
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException, IOException {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+        Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+            any(RecordSchema.class), any(String.class), any(String.class));
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafkaRecord_2_0() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafkaRecord_2_0.TOPIC, TOPIC_NAME);
+
+        final String readerId = "record-reader";
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService(readerId, readerService);
+        runner.enableControllerService(readerService);
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
+        runner.addControllerService(writerId, writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(PublishKafkaRecord_2_0.RECORD_READER, readerId);
+        runner.setProperty(PublishKafkaRecord_2_0.RECORD_WRITER, writerId);
+        runner.setProperty(PublishKafka_2_0.DELIVERY_GUARANTEE, PublishKafka_2_0.DELIVERY_REPLICATED);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+    @Test
+    public void testNoRecordsInFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue(new byte[0]));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 0);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("msg.count", "0");
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafkaRecord_2_0.REL_SUCCESS, 0);
+        runner.assertTransferCount(PublishKafkaRecord_2_0.REL_FAILURE, 4);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+        return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+        final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+
+            @Override
+            public boolean isFailure() {
+                return !failures.isEmpty();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
new file mode 100644
index 0000000..47cf072
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestPublishKafka_2_0 {
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafka_2_0() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafka_2_0.TOPIC, TOPIC_NAME);
+        runner.setProperty(PublishKafka_2_0.DELIVERY_GUARANTEE, PublishKafka_2_0.DELIVERY_REPLICATED);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+
+        when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafka_2_0.REL_SUCCESS, 0);
+        runner.assertTransferCount(PublishKafka_2_0.REL_FAILURE, 4);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertTrue(runner.getFlowFilesForRelationship(PublishKafka_2_0.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+        return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+        final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+            @Override
+            public boolean isFailure() {
+                return !failures.isEmpty();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+        };
+    }
+}