You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/09/19 13:50:26 UTC
[2/5] nifi git commit: NIFI-5518: Added processors for integrating
with Apache Kafka 2.0
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html
new file mode 100644
index 0000000..dd89164
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_0/additionalDetails.html
@@ -0,0 +1,144 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>PublishKafkaRecord</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <h2>Description</h2>
+ <p>
+ This Processor puts the contents of a FlowFile to a Topic in
+ <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
+ with Kafka 2.0 API. The contents of the incoming FlowFile will be read using the
+ configured Record Reader. Each record will then be serialized using the configured
+ Record Writer, and this serialized form will be the content of a Kafka message.
+ This message is optionally assigned a key by using the <Kafka Key> Property.
+ </p>
+
+
+ <h2>Security Configuration</h2>
+ <p>
+ The Security Protocol property allows the user to specify the protocol for communicating
+ with the Kafka broker. The following sections describe each of the protocols in further detail.
+ </p>
+ <h3>PLAINTEXT</h3>
+ <p>
+ This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+ In order to use this option the broker must be configured with a listener of the form:
+ <pre>
+ PLAINTEXT://host.name:port
+ </pre>
+ </p>
+ <h3>SSL</h3>
+ <p>
+ This option provides an encrypted connection to the broker, with optional client authentication. In order
+ to use this option the broker must be configured with a listener of the form:
+ <pre>
+ SSL://host.name:port
+ </pre>
+ In addition, the processor must have an SSL Context Service selected.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+ not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+ a truststore containing the public key of the certificate authority used to sign the broker's key.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+ In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+ a truststore as described above.
+ </p>
+ <h3>SASL_PLAINTEXT</h3>
+ <p>
+ This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_PLAINTEXT://host.name:port
+ </pre>
+ In addition, the Kerberos Service Name must be specified in the processor.
+ </p>
+ <h4>SASL_PLAINTEXT - GSSAPI</h4>
+ <p>
+ If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+ JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+ NiFi's bootstrap.conf, such as:
+ <pre>
+ java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+ </pre>
+ </p>
+ <p>
+ An example of the JAAS config file would be the following:
+ <pre>
+ KafkaClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/path/to/nifi.keytab"
+ serviceName="kafka"
+ principal="nifi@YOURREALM.COM";
+ };
+ </pre>
+ <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+ </p>
+ <p>
+ Alternatively, the JAAS
+ configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+ directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+ will take precedence over the java.security.auth.login.config system property.
+ </p>
+ <h4>SASL_PLAINTEXT - PLAIN</h4>
+ <p>
+ If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+ the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+ be the following:
+ <pre>
+ KafkaClient {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="nifi"
+ password="nifi-password";
+ };
+ </pre>
+ </p>
+ <p>
+ <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+ the username and password unencrypted.
+ </p>
+ <p>
+ <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+ it visible to components in other NARs that may access the providers. There is currently a known issue
+ where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+ </p>
+ <h3>SASL_SSL</h3>
+ <p>
+ This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_SSL://host.name:port
+ </pre>
+ </p>
+ <p>
+ See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+ depending on the SASL mechanism (GSSAPI or PLAIN).
+ </p>
+ <p>
+ See the SSL section for a description of how to configure the SSL Context Service based on the
+ ssl.client.auth property.
+ </p>
+ </body>
+</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html
new file mode 100644
index 0000000..1d26464
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0/additionalDetails.html
@@ -0,0 +1,156 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>PublishKafka</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <h2>Description</h2>
+ <p>
+ This Processor puts the contents of a FlowFile to a Topic in
+ <a href="http://kafka.apache.org/">Apache Kafka</a> using KafkaProducer API available
+ with Kafka 2.0 API. The content of a FlowFile becomes the contents of a Kafka message.
+ This message is optionally assigned a key by using the <Kafka Key> Property.
+ </p>
+
+ <p>
+ The Processor allows the user to configure an optional Message Demarcator that
+ can be used to send many messages per FlowFile. For example, a <i>\n</i> could be used
+ to indicate that the contents of the FlowFile should be used to send one message
+ per line of text. It also supports multi-char demarcators (e.g., 'my custom demarcator').
+ If the property is not set, the entire contents of the FlowFile
+ will be sent as a single message. When using the demarcator, if some messages are
+ successfully sent but other messages fail to send, the resulting FlowFile will be
+ considered a failed FlowFile and will have additional attributes to that effect.
+ One of such attributes is 'failed.last.idx' which indicates the index of the last message
+ that was successfully ACKed by Kafka. (if no demarcator is used the value of this index will be -1).
+ This will allow PublishKafka to only re-send un-ACKed messages on the next re-try.
+ </p>
+
+
+ <h2>Security Configuration</h2>
+ <p>
+ The Security Protocol property allows the user to specify the protocol for communicating
+ with the Kafka broker. The following sections describe each of the protocols in further detail.
+ </p>
+ <h3>PLAINTEXT</h3>
+ <p>
+ This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+ In order to use this option the broker must be configured with a listener of the form:
+ <pre>
+ PLAINTEXT://host.name:port
+ </pre>
+ </p>
+ <h3>SSL</h3>
+ <p>
+ This option provides an encrypted connection to the broker, with optional client authentication. In order
+ to use this option the broker must be configured with a listener of the form:
+ <pre>
+ SSL://host.name:port
+ </pre>
+ In addition, the processor must have an SSL Context Service selected.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+ not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+ a truststore containing the public key of the certificate authority used to sign the broker's key.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+ In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+ a truststore as described above.
+ </p>
+ <h3>SASL_PLAINTEXT</h3>
+ <p>
+ This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_PLAINTEXT://host.name:port
+ </pre>
+ In addition, the Kerberos Service Name must be specified in the processor.
+ </p>
+ <h4>SASL_PLAINTEXT - GSSAPI</h4>
+ <p>
+ If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+ JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+ NiFi's bootstrap.conf, such as:
+ <pre>
+ java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+ </pre>
+ </p>
+ <p>
+ An example of the JAAS config file would be the following:
+ <pre>
+ KafkaClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/path/to/nifi.keytab"
+ serviceName="kafka"
+ principal="nifi@YOURREALM.COM";
+ };
+ </pre>
+ <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+ </p>
+ <p>
+ Alternatively, the JAAS
+ configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+ directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+ will take precedence over the java.security.auth.login.config system property.
+ </p>
+ <h4>SASL_PLAINTEXT - PLAIN</h4>
+ <p>
+ If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+ the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+ be the following:
+ <pre>
+ KafkaClient {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="nifi"
+ password="nifi-password";
+ };
+ </pre>
+ </p>
+ <p>
+ <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+ the username and password unencrypted.
+ </p>
+ <p>
+ <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+ it visible to components in other NARs that may access the providers. There is currently a known issue
+ where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+ </p>
+ <h3>SASL_SSL</h3>
+ <p>
+ This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_SSL://host.name:port
+ </pre>
+ </p>
+ <p>
+ See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+ depending on the SASL mechanism (GSSAPI or PLAIN).
+ </p>
+ <p>
+ See the SSL section for a description of how to configure the SSL Context Service based on the
+ ssl.client.auth property.
+ </p>
+ </body>
+</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
new file mode 100644
index 0000000..810c235
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerPoolTest {
+
+ private Consumer<byte[], byte[]> consumer = null;
+ private ProcessSession mockSession = null;
+ private ProcessContext mockContext = Mockito.mock(ProcessContext.class);
+ private ProvenanceReporter mockReporter = null;
+ private ConsumerPool testPool = null;
+ private ConsumerPool testDemarcatedPool = null;
+ private ComponentLog logger = null;
+
+ @Before
+ @SuppressWarnings("unchecked")
+ public void setup() {
+ consumer = mock(Consumer.class);
+ logger = mock(ComponentLog.class);
+ mockSession = mock(ProcessSession.class);
+ mockReporter = mock(ProvenanceReporter.class);
+ when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+ testPool = new ConsumerPool(
+ 1,
+ null,
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger,
+ true,
+ StandardCharsets.UTF_8,
+ null) {
+ @Override
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ return consumer;
+ }
+ };
+ testDemarcatedPool = new ConsumerPool(
+ 1,
+ "--demarcator--".getBytes(StandardCharsets.UTF_8),
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger,
+ true,
+ StandardCharsets.UTF_8,
+ Pattern.compile(".*")) {
+ @Override
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ return consumer;
+ }
+ };
+ }
+
+ @Test
+ public void validatePoolSimpleCreateClose() throws Exception {
+
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ lease.poll();
+ }
+ testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(4, stats.leasesObtainedCount);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void validatePoolSimpleCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
+ };
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ lease.poll();
+ lease.commit();
+ }
+ testPool.close();
+ verify(mockSession, times(3)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @Test
+ public void validatePoolSimpleBatchCreateClose() throws Exception {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ for (int i = 0; i < 100; i++) {
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ for (int j = 0; j < 100; j++) {
+ lease.poll();
+ }
+ }
+ }
+ testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(100, stats.leasesObtainedCount);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void validatePoolBatchCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
+ };
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
+ lease.poll();
+ lease.commit();
+ }
+ testDemarcatedPool.close();
+ verify(mockSession, times(1)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testDemarcatedPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @Test
+ public void validatePoolConsumerFails() throws Exception {
+
+ when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+ try {
+ lease.poll();
+ fail();
+ } catch (final KafkaException ke) {
+
+ }
+ }
+ testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+ final TopicPartition tPart = new TopicPartition(topic, partition);
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ long offset = startingOffset;
+ for (final byte[] rawRecord : rawRecords) {
+ final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+ records.add(rec);
+ }
+ map.put(tPart, records);
+ return new ConsumerRecords(map);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
new file mode 100644
index 0000000..399d426
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class ITConsumeKafka_2_0 {
+
+ ConsumerLease mockLease = null;
+ ConsumerPool mockConsumerPool = null;
+
+ @Before
+ public void setup() {
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
+ }
+
+ @Test
+ public void validateGetAllMessages() throws Exception {
+ String groupName = "validateGetAllMessages";
+
+ when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+ ConsumeKafka_2_0 proc = new ConsumeKafka_2_0() {
+ @Override
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
+ }
+ };
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+ runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo,bar");
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+ @Test
+ public void validateGetAllMessagesPattern() throws Exception {
+ String groupName = "validateGetAllMessagesPattern";
+
+ when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+ ConsumeKafka_2_0 proc = new ConsumeKafka_2_0() {
+ @Override
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
+ }
+ };
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+ runner.setProperty(ConsumeKafka_2_0.TOPICS, "(fo.*)|(ba)");
+ runner.setProperty(ConsumeKafka_2_0.TOPIC_TYPE, "pattern");
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+ @Test
+ public void validateGetErrorMessages() throws Exception {
+ String groupName = "validateGetErrorMessages";
+
+ when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(true, false);
+ when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+ ConsumeKafka_2_0 proc = new ConsumeKafka_2_0() {
+ @Override
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
+ }
+ };
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+ runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo,bar");
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+ verify(mockLease, times(2)).continuePolling();
+ verify(mockLease, times(1)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
new file mode 100644
index 0000000..9b05188
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class TestConsumeKafkaRecord_2_0 {
+
+ private ConsumerLease mockLease = null;
+ private ConsumerPool mockConsumerPool = null;
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws InitializationException {
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
+
+ ConsumeKafkaRecord_2_0 proc = new ConsumeKafkaRecord_2_0() {
+ @Override
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
+ }
+ };
+
+ runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+
+ final String readerId = "record-reader";
+ final MockRecordParser readerService = new MockRecordParser();
+ readerService.addSchemaField("name", RecordFieldType.STRING);
+ readerService.addSchemaField("age", RecordFieldType.INT);
+ runner.addControllerService(readerId, readerService);
+ runner.enableControllerService(readerService);
+
+ final String writerId = "record-writer";
+ final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
+ runner.addControllerService(writerId, writerService);
+ runner.enableControllerService(writerService);
+
+ runner.setProperty(ConsumeKafkaRecord_2_0.RECORD_READER, readerId);
+ runner.setProperty(ConsumeKafkaRecord_2_0.RECORD_WRITER, writerId);
+ }
+
+ @Test
+ public void validateCustomValidatorSettings() throws Exception {
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ runner.assertValid();
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
+ runner.assertNotValid();
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ runner.assertValid();
+ runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ runner.assertValid();
+ runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void validatePropertiesValidation() throws Exception {
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+ runner.removeProperty(ConsumeKafkaRecord_2_0.GROUP_ID);
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("invalid because Group ID is required"));
+ }
+
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+ }
+
+ @Test
+ public void validateGetAllMessages() throws Exception {
+ String groupName = "validateGetAllMessages";
+
+ when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo,bar");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+ @Test
+ public void validateGetAllMessagesPattern() throws Exception {
+ String groupName = "validateGetAllMessagesPattern";
+
+ when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "(fo.*)|(ba)");
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPIC_TYPE, "pattern");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+ @Test
+ public void validateGetErrorMessages() throws Exception {
+ String groupName = "validateGetErrorMessages";
+
+ when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(true, false);
+ when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo,bar");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject());
+ verify(mockLease, times(2)).continuePolling();
+ verify(mockLease, times(1)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+ @Test
+ public void testJaasConfiguration() throws Exception {
+ runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
+ runner.assertValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+ runner.assertValid();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
new file mode 100644
index 0000000..297772e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class TestConsumeKafka_2_0 {
+
+ ConsumerLease mockLease = null;
+ ConsumerPool mockConsumerPool = null;
+
+ @Before
+ public void setup() {
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
+ }
+
+ @Test
+ public void validateCustomValidatorSettings() throws Exception {
+ ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
+ TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ runner.assertValid();
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
+ runner.assertNotValid();
+ runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ runner.assertValid();
+ runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ runner.assertValid();
+ runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void validatePropertiesValidation() throws Exception {
+ ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
+ TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+
+ runner.removeProperty(ConsumeKafka_2_0.GROUP_ID);
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("invalid because Group ID is required"));
+ }
+
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+ }
+
+ @Test
+ public void testJaasConfiguration() throws Exception {
+ ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
+ TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
+ runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
+ runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
+
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
+ runner.assertValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+ runner.assertNotValid();
+
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+ runner.assertValid();
+
+ runner.setVariable("keytab", "src/test/resources/server.properties");
+ runner.setVariable("principal", "nifi@APACHE.COM");
+ runner.setVariable("service", "kafka");
+ runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
+ runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}s");
+ runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
+ runner.assertValid();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
new file mode 100644
index 0000000..f95c3c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.processors.kafka.pubsub.InFlightMessageTracker;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+public class TestInFlightMessageTracker {
+
+ @Test(timeout = 5000L)
+ public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+
+ final InFlightMessageTracker tracker = new InFlightMessageTracker(new MockComponentLog("1", "unit-test"));
+ tracker.incrementSentCount(flowFile);
+
+ verifyNotComplete(tracker);
+
+ tracker.incrementSentCount(flowFile);
+ verifyNotComplete(tracker);
+
+ tracker.incrementAcknowledgedCount(flowFile);
+ verifyNotComplete(tracker);
+
+ tracker.incrementAcknowledgedCount(flowFile);
+ tracker.awaitCompletion(1L);
+ }
+
+ @Test(timeout = 5000L)
+ public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+
+ final InFlightMessageTracker tracker = new InFlightMessageTracker(new MockComponentLog("1", "unit-test"));
+ tracker.incrementSentCount(flowFile);
+
+ verifyNotComplete(tracker);
+
+ tracker.incrementSentCount(flowFile);
+ verifyNotComplete(tracker);
+
+ final ExecutorService exec = Executors.newFixedThreadPool(1);
+ final Future<?> future = exec.submit(() -> {
+ try {
+ tracker.awaitCompletion(10000L);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ tracker.incrementAcknowledgedCount(flowFile);
+ tracker.incrementAcknowledgedCount(flowFile);
+
+ future.get();
+ }
+
+ private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
+ try {
+ tracker.awaitCompletion(10L);
+ Assert.fail("Expected timeout");
+ } catch (final TimeoutException te) {
+ // expected
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
new file mode 100644
index 0000000..39a135d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestPublishKafkaRecord_2_0 {
+
+ private static final String TOPIC_NAME = "unit-test";
+
+ private PublisherPool mockPool;
+ private PublisherLease mockLease;
+ private TestRunner runner;
+
+ @Before
+ public void setup() throws InitializationException, IOException {
+ mockPool = mock(PublisherPool.class);
+ mockLease = mock(PublisherLease.class);
+ Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+ any(RecordSchema.class), any(String.class), any(String.class));
+
+ when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+ runner = TestRunners.newTestRunner(new PublishKafkaRecord_2_0() {
+ @Override
+ protected PublisherPool createPublisherPool(final ProcessContext context) {
+ return mockPool;
+ }
+ });
+
+ runner.setProperty(PublishKafkaRecord_2_0.TOPIC, TOPIC_NAME);
+
+ final String readerId = "record-reader";
+ final MockRecordParser readerService = new MockRecordParser();
+ readerService.addSchemaField("name", RecordFieldType.STRING);
+ readerService.addSchemaField("age", RecordFieldType.INT);
+ runner.addControllerService(readerId, readerService);
+ runner.enableControllerService(readerService);
+
+ final String writerId = "record-writer";
+ final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
+ runner.addControllerService(writerId, writerService);
+ runner.enableControllerService(writerService);
+
+ runner.setProperty(PublishKafkaRecord_2_0.RECORD_READER, readerId);
+ runner.setProperty(PublishKafkaRecord_2_0.RECORD_WRITER, writerId);
+ runner.setProperty(PublishKafka_2_0.DELIVERY_GUARANTEE, PublishKafka_2_0.DELIVERY_REPLICATED);
+ }
+
+ @Test
+ public void testSingleSuccess() throws IOException {
+ final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+ when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleSuccess() throws IOException {
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+
+ when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 3);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testSingleFailure() throws IOException {
+ final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleFailures() throws IOException {
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 3);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleMessagesPerFlowFile() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
+ flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 10);
+ msgCounts.put(flowFiles.get(1), 20);
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 2);
+
+ verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+
+ runner.assertAllFlowFilesContainAttribute("msg.count");
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_SUCCESS).stream()
+ .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+ .count());
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_SUCCESS).stream()
+ .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+ .count());
+ }
+
+ @Test
+ public void testNoRecordsInFlowFile() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue(new byte[0]));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 0);
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_SUCCESS, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("msg.count", "0");
+ }
+
+
+ @Test
+ public void testSomeSuccessSomeFailure() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 10);
+ msgCounts.put(flowFiles.get(1), 20);
+
+ final Map<FlowFile, Exception> failureMap = new HashMap<>();
+ failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+ failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertTransferCount(PublishKafkaRecord_2_0.REL_SUCCESS, 0);
+ runner.assertTransferCount(PublishKafkaRecord_2_0.REL_FAILURE, 4);
+
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+
+ assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_2_0.REL_FAILURE).stream()
+ .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+ }
+
+
+ private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+ return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+ }
+
+ private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ for (final FlowFile ff : successfulFlowFiles) {
+ msgCounts.put(ff, msgCountPerFlowFile);
+ }
+ return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+ }
+
+ private PublishResult createFailurePublishResult(final FlowFile failure) {
+ return createFailurePublishResult(Collections.singleton(failure));
+ }
+
+ private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+ final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+ return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+ }
+
+ private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+ // sanity check.
+ for (final FlowFile success : successFlowFiles) {
+ if (failures.containsKey(success)) {
+ throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+ }
+ }
+
+ return new PublishResult() {
+
+ @Override
+ public int getSuccessfulMessageCount(FlowFile flowFile) {
+ Integer count = msgCounts.get(flowFile);
+ return count == null ? 0 : count.intValue();
+ }
+
+ @Override
+ public Exception getReasonForFailure(FlowFile flowFile) {
+ return failures.get(flowFile);
+ }
+
+ @Override
+ public boolean isFailure() {
+ return !failures.isEmpty();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0da4f50e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
new file mode 100644
index 0000000..47cf072
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestPublishKafka_2_0 {
+ private static final String TOPIC_NAME = "unit-test";
+
+ private PublisherPool mockPool;
+ private PublisherLease mockLease;
+ private TestRunner runner;
+
+ @Before
+ public void setup() {
+ mockPool = mock(PublisherPool.class);
+ mockLease = mock(PublisherLease.class);
+
+ when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+ runner = TestRunners.newTestRunner(new PublishKafka_2_0() {
+ @Override
+ protected PublisherPool createPublisherPool(final ProcessContext context) {
+ return mockPool;
+ }
+ });
+
+ runner.setProperty(PublishKafka_2_0.TOPIC, TOPIC_NAME);
+ runner.setProperty(PublishKafka_2_0.DELIVERY_GUARANTEE, PublishKafka_2_0.DELIVERY_REPLICATED);
+ }
+
+ @Test
+ public void testSingleSuccess() throws IOException {
+ final MockFlowFile flowFile = runner.enqueue("hello world");
+
+ when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleSuccess() throws IOException {
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+
+ when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 3);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testSingleFailure() throws IOException {
+ final MockFlowFile flowFile = runner.enqueue("hello world");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleFailures() throws IOException {
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 3);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
+ public void testMultipleMessagesPerFlowFile() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 10);
+ msgCounts.put(flowFiles.get(1), 20);
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_SUCCESS, 2);
+
+ verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+
+ runner.assertAllFlowFilesContainAttribute("msg.count");
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_0.REL_SUCCESS).stream()
+ .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+ .count());
+ assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_2_0.REL_SUCCESS).stream()
+ .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+ .count());
+ }
+
+
+ @Test
+ public void testSomeSuccessSomeFailure() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 10);
+ msgCounts.put(flowFiles.get(1), 20);
+
+ final Map<FlowFile, Exception> failureMap = new HashMap<>();
+ failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
+ failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertTransferCount(PublishKafka_2_0.REL_SUCCESS, 0);
+ runner.assertTransferCount(PublishKafka_2_0.REL_FAILURE, 4);
+
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+
+ assertTrue(runner.getFlowFilesForRelationship(PublishKafka_2_0.REL_FAILURE).stream()
+ .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+ }
+
+
+ private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
+ return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
+ }
+
+ private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ for (final FlowFile ff : successfulFlowFiles) {
+ msgCounts.put(ff, msgCountPerFlowFile);
+ }
+ return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
+ }
+
+ private PublishResult createFailurePublishResult(final FlowFile failure) {
+ return createFailurePublishResult(Collections.singleton(failure));
+ }
+
+ private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
+ final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
+ return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
+ }
+
+ private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
+ // sanity check.
+ for (final FlowFile success : successFlowFiles) {
+ if (failures.containsKey(success)) {
+ throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
+ }
+ }
+
+ return new PublishResult() {
+ @Override
+ public boolean isFailure() {
+ return !failures.isEmpty();
+ }
+
+ @Override
+ public int getSuccessfulMessageCount(FlowFile flowFile) {
+ Integer count = msgCounts.get(flowFile);
+ return count == null ? 0 : count.intValue();
+ }
+
+ @Override
+ public Exception getReasonForFailure(FlowFile flowFile) {
+ return failures.get(flowFile);
+ }
+ };
+ }
+}