You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/10/06 19:19:29 UTC
[3/5] nifi git commit: Revert "NIFI-2865: Refactored PublishKafka and
PublishKafka_0_10 to allow batching of FlowFiles within a single publish and
to let messages timeout if not acknowledged"
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
new file mode 100644
index 0000000..b3f1bd1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.times;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.verify;
+
+public class PublishKafkaTest {
+
+ @Test
+ public void validateCustomSerilaizerDeserializerSettings() throws Exception {
+ PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+ TestRunner runner = TestRunners.newTestRunner(publishKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3 sec");
+ runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ runner.assertValid();
+ runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void validatePropertiesValidation() throws Exception {
+ PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+ TestRunner runner = TestRunners.newTestRunner(publishKafka);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+ runner.setProperty(PublishKafka_0_10.TOPIC, "foo");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "foo");
+
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid"));
+ }
+ }
+
+ @Test
+ public void validateCustomValidation() {
+ String topicName = "validateCustomValidation";
+ PublishKafka_0_10 publishKafka = new PublishKafka_0_10();
+
+ /*
+ * Validates that Kerberos principle is required if one of SASL set for
+ * secirity protocol
+ */
+ TestRunner runner = TestRunners.newTestRunner(publishKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+ try {
+ runner.run();
+ fail();
+ } catch (Throwable e) {
+ assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because"));
+ }
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateSingleCharacterDemarcatedMessages() {
+ String topicName = "validateSingleCharacterDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+
+ runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
+ String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
+
+ runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
+ String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+ StubPublishKafka putKafka = new StubPublishKafka(1);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "foo");
+
+ runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "3000 millis");
+
+ final String text = "Hello World\nGoodbye\nfail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ putKafka.destroy();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(1);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+ final String text = "Hello World\nGoodbye\nfail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+ final String text = "futurefail\nHello World\nGoodbye\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
+ assertNotNull(ff);
+ runner.enqueue(ff);
+
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ // 6 sends due to duplication
+ verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
+ String topicName = "validateSendFailureAndThenResendSuccess";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+ runner.setProperty(PublishKafka_0_10.META_WAIT_TIME, "500 millis");
+
+ final String text = "Hello World\nGoodbye\nfuturefail\n2";
+ runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+ MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).get(0);
+ assertNotNull(ff);
+ runner.enqueue(ff);
+
+ runner.run(1, false);
+ assertEquals(0, runner.getQueueSize().getObjectCount());
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ // 6 sends due to duplication
+ verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateDemarcationIntoEmptyMessages() {
+ String topicName = "validateDemarcationIntoEmptyMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ final TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(PublishKafka_0_10.KEY, "key1");
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\n");
+
+ final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
+ runner.enqueue(bytes);
+ runner.run(1);
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateComplexRightPartialDemarcatedMessages() {
+ String topicName = "validateComplexRightPartialDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
+
+ runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateComplexLeftPartialDemarcatedMessages() {
+ String topicName = "validateComplexLeftPartialDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
+
+ runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void validateComplexPartialMatchDemarcatedMessages() {
+ String topicName = "validateComplexPartialMatchDemarcatedMessages";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
+
+ runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDBOOMSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ Producer<byte[], byte[]> producer = putKafka.getProducer();
+ verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
+ runner.shutdown();
+ }
+
+ @Test
+ public void validateUtf8Key() {
+ String topicName = "validateUtf8Key";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+ final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
+ runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ final Map<Object, Object> msgs = putKafka.getMessagesSent();
+ assertEquals(1, msgs.size());
+ final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+ assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
+ }
+
+ @Test
+ public void validateHexKey() {
+ String topicName = "validateUtf8Key";
+ StubPublishKafka putKafka = new StubPublishKafka(100);
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
+ runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
+ runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
+
+ final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
+ runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
+ runner.run(1);
+
+ runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
+ final Map<Object, Object> msgs = putKafka.getMessagesSent();
+ assertEquals(1, msgs.size());
+ final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
+
+ assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
new file mode 100644
index 0000000..76c29cd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+public class PublishingContextTest {
+
+ @Test
+ public void failInvalidConstructorArgs() {
+ try {
+ new PublishingContext(null, null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+ try {
+ new PublishingContext(mock(InputStream.class), null);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+
+ try {
+ new PublishingContext(mock(InputStream.class), "");
+ fail();
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+
+ try {
+ new PublishingContext(mock(InputStream.class), "mytopic", -3);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+ }
+
+ @Test
+ public void validateFullSetting() {
+ PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3);
+ publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
+ publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
+
+ assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
+ assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8));
+ assertEquals("topic", publishingContext.getTopic());
+ assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString());
+ }
+
+ @Test
+ public void validateOnlyOnceSetPerInstance() {
+ PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic");
+ publishingContext.setKeyBytes(new byte[]{0});
+ try {
+ publishingContext.setKeyBytes(new byte[]{0});
+ fail();
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+
+ publishingContext.setDelimiterBytes(new byte[]{0});
+ try {
+ publishingContext.setDelimiterBytes(new byte[]{0});
+ fail();
+ } catch (IllegalArgumentException e) {
+ // success
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
new file mode 100644
index 0000000..c009014
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import static org.mockito.Mockito.when;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class StubPublishKafka extends PublishKafka_0_10 {
+
+ private volatile Producer<byte[], byte[]> producer;
+
+ private volatile boolean failed;
+
+ private final int ackCheckSize;
+
+ private final ExecutorService executor = Executors.newCachedThreadPool();
+ private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
+
+ StubPublishKafka(int ackCheckSize) {
+ this.ackCheckSize = ackCheckSize;
+ }
+
+ public Producer<byte[], byte[]> getProducer() {
+ return producer;
+ }
+
+ public void destroy() {
+ this.executor.shutdownNow();
+ }
+
+ public Map<Object, Object> getMessagesSent() {
+ return new HashMap<>(msgsSent);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
+ throws ProcessException {
+ final Map<String, String> kafkaProperties = new HashMap<>();
+ KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ KafkaPublisher publisher;
+ try {
+ Field f = PublishKafka_0_10.class.getDeclaredField("brokers");
+ f.setAccessible(true);
+ f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
+ publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
+ publisher.setAckWaitTime(15000);
+ producer = mock(Producer.class);
+ this.instrumentProducer(producer, false);
+ Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
+ kf.setAccessible(true);
+ kf.set(publisher, producer);
+
+ Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog");
+ componentLogF.setAccessible(true);
+ componentLogF.set(publisher, mock(ComponentLog.class));
+
+ Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
+ ackCheckSizeField.setAccessible(true);
+ ackCheckSizeField.set(publisher, this.ackCheckSize);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IllegalStateException(e);
+ }
+ return publisher;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) {
+
+ when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
+ @Override
+ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+ final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+ if (record != null && record.key() != null) {
+ msgsSent.put(record.key(), record.value());
+ }
+
+ String value = new String(record.value(), StandardCharsets.UTF_8);
+ if ("fail".equals(value) && !StubPublishKafka.this.failed) {
+ StubPublishKafka.this.failed = true;
+ throw new RuntimeException("intentional");
+ }
+ Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() {
+ @Override
+ public RecordMetadata call() throws Exception {
+ if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
+ StubPublishKafka.this.failed = true;
+ throw new TopicAuthorizationException("Unauthorized");
+ } else {
+ TopicPartition partition = new TopicPartition("foo", 0);
+ RecordMetadata meta = new RecordMetadata(partition, 0, 0);
+ return meta;
+ }
+ }
+ });
+ return future;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
deleted file mode 100644
index e54a10c..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestInFlightMessageTracker {
-
- @Test(timeout = 5000L)
- public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
- final MockFlowFile flowFile = new MockFlowFile(1L);
-
- final InFlightMessageTracker tracker = new InFlightMessageTracker();
- tracker.incrementSentCount(flowFile);
-
- verifyNotComplete(tracker);
-
- tracker.incrementSentCount(flowFile);
- verifyNotComplete(tracker);
-
- tracker.incrementAcknowledgedCount(flowFile);
- verifyNotComplete(tracker);
-
- tracker.incrementAcknowledgedCount(flowFile);
- tracker.awaitCompletion(1L);
- }
-
- @Test(timeout = 5000L)
- public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
- final MockFlowFile flowFile = new MockFlowFile(1L);
-
- final InFlightMessageTracker tracker = new InFlightMessageTracker();
- tracker.incrementSentCount(flowFile);
-
- verifyNotComplete(tracker);
-
- tracker.incrementSentCount(flowFile);
- verifyNotComplete(tracker);
-
- final ExecutorService exec = Executors.newFixedThreadPool(1);
- final Future<?> future = exec.submit(() -> {
- try {
- tracker.awaitCompletion(10000L);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- tracker.incrementAcknowledgedCount(flowFile);
- tracker.incrementAcknowledgedCount(flowFile);
-
- future.get();
- }
-
- private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
- try {
- tracker.awaitCompletion(10L);
- Assert.fail("Expected timeout");
- } catch (final TimeoutException te) {
- // expected
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
deleted file mode 100644
index c7d1a60..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestPublishKafka {
- private static final String TOPIC_NAME = "unit-test";
-
- private PublisherPool mockPool;
- private PublisherLease mockLease;
- private TestRunner runner;
-
- @Before
- public void setup() {
- mockPool = mock(PublisherPool.class);
- mockLease = mock(PublisherLease.class);
-
- when(mockPool.obtainPublisher()).thenReturn(mockLease);
-
- runner = TestRunners.newTestRunner(new PublishKafka_0_10() {
- @Override
- protected PublisherPool createPublisherPool(final ProcessContext context) {
- return mockPool;
- }
- });
-
- runner.setProperty(PublishKafka_0_10.TOPIC, TOPIC_NAME);
- }
-
- @Test
- public void testSingleSuccess() throws IOException {
- final MockFlowFile flowFile = runner.enqueue("hello world");
-
- when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 1));
-
- runner.run();
- runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
-
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(1)).complete();
- verify(mockLease, times(0)).poison();
- verify(mockLease, times(1)).close();
- }
-
- @Test
- public void testMultipleSuccess() throws IOException {
- final Set<FlowFile> flowFiles = new HashSet<>();
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
-
-
- when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
-
- runner.run();
- runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 3);
-
- verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(1)).complete();
- verify(mockLease, times(0)).poison();
- verify(mockLease, times(1)).close();
- }
-
- @Test
- public void testSingleFailure() throws IOException {
- final MockFlowFile flowFile = runner.enqueue("hello world");
-
- when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
-
- runner.run();
- runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_FAILURE, 1);
-
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(1)).complete();
- verify(mockLease, times(1)).close();
- }
-
- @Test
- public void testMultipleFailures() throws IOException {
- final Set<FlowFile> flowFiles = new HashSet<>();
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
-
- when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
-
- runner.run();
- runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_FAILURE, 3);
-
- verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(1)).complete();
- verify(mockLease, times(1)).close();
- }
-
- @Test
- public void testMultipleMessagesPerFlowFile() throws IOException {
- final List<FlowFile> flowFiles = new ArrayList<>();
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
-
- final Map<FlowFile, Integer> msgCounts = new HashMap<>();
- msgCounts.put(flowFiles.get(0), 10);
- msgCounts.put(flowFiles.get(1), 20);
-
- final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
-
- when(mockLease.complete()).thenReturn(result);
-
- runner.run();
- runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 2);
-
- verify(mockLease, times(2)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(1)).complete();
- verify(mockLease, times(0)).poison();
- verify(mockLease, times(1)).close();
-
- runner.assertAllFlowFilesContainAttribute("msg.count");
- assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
- .filter(ff -> ff.getAttribute("msg.count").equals("10"))
- .count());
- assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
- .filter(ff -> ff.getAttribute("msg.count").equals("20"))
- .count());
- }
-
-
- @Test
- public void testSomeSuccessSomeFailure() throws IOException {
- final List<FlowFile> flowFiles = new ArrayList<>();
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
- flowFiles.add(runner.enqueue("hello world"));
-
- final Map<FlowFile, Integer> msgCounts = new HashMap<>();
- msgCounts.put(flowFiles.get(0), 10);
- msgCounts.put(flowFiles.get(1), 20);
-
- final Map<FlowFile, Exception> failureMap = new HashMap<>();
- failureMap.put(flowFiles.get(2), new RuntimeException("Intentional Unit Test Exception"));
- failureMap.put(flowFiles.get(3), new RuntimeException("Intentional Unit Test Exception"));
-
- final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles.subList(0, 2)), failureMap);
-
- when(mockLease.complete()).thenReturn(result);
-
- runner.run();
- runner.assertTransferCount(PublishKafka_0_10.REL_SUCCESS, 2);
- runner.assertTransferCount(PublishKafka_0_10.REL_FAILURE, 2);
-
- verify(mockLease, times(4)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
- verify(mockLease, times(1)).complete();
- verify(mockLease, times(1)).close();
-
- assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
- .filter(ff -> "10".equals(ff.getAttribute("msg.count")))
- .count());
- assertEquals(1, runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_SUCCESS).stream()
- .filter(ff -> "20".equals(ff.getAttribute("msg.count")))
- .count());
-
- assertTrue(runner.getFlowFilesForRelationship(PublishKafka_0_10.REL_FAILURE).stream()
- .noneMatch(ff -> ff.getAttribute("msg.count") != null));
- }
-
-
- private PublishResult createAllSuccessPublishResult(final FlowFile successfulFlowFile, final int msgCount) {
- return createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), msgCount);
- }
-
- private PublishResult createAllSuccessPublishResult(final Set<FlowFile> successfulFlowFiles, final int msgCountPerFlowFile) {
- final Map<FlowFile, Integer> msgCounts = new HashMap<>();
- for (final FlowFile ff : successfulFlowFiles) {
- msgCounts.put(ff, msgCountPerFlowFile);
- }
- return createPublishResult(msgCounts, successfulFlowFiles, Collections.emptyMap());
- }
-
- private PublishResult createFailurePublishResult(final FlowFile failure) {
- return createFailurePublishResult(Collections.singleton(failure));
- }
-
- private PublishResult createFailurePublishResult(final Set<FlowFile> failures) {
- final Map<FlowFile, Exception> failureMap = failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new RuntimeException("Intentional Unit Test Exception")));
- return createPublishResult(Collections.emptyMap(), Collections.emptySet(), failureMap);
- }
-
- private PublishResult createPublishResult(final Map<FlowFile, Integer> msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> failures) {
- // sanity check.
- for (final FlowFile success : successFlowFiles) {
- if (failures.containsKey(success)) {
- throw new IllegalArgumentException("Found same FlowFile in both 'success' and 'failures' collections: " + success);
- }
- }
-
- return new PublishResult() {
- @Override
- public Collection<FlowFile> getSuccessfulFlowFiles() {
- return successFlowFiles;
- }
-
- @Override
- public Collection<FlowFile> getFailedFlowFiles() {
- return failures.keySet();
- }
-
- @Override
- public int getSuccessfulMessageCount(FlowFile flowFile) {
- Integer count = msgCounts.get(flowFile);
- return count == null ? 0 : count.intValue();
- }
-
- @Override
- public Exception getReasonForFailure(FlowFile flowFile) {
- return failures.get(flowFile);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
deleted file mode 100644
index c2d143c..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-
-public class TestPublisherLease {
- private ComponentLog logger;
- private Producer<byte[], byte[]> producer;
-
- @Before
- @SuppressWarnings("unchecked")
- public void setup() {
- logger = Mockito.mock(ComponentLog.class);
- producer = Mockito.mock(Producer.class);
- }
-
- @Test
- public void testPoisonOnException() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
- @Override
- public void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
-
- final FlowFile flowFile = new MockFlowFile(1L);
- final String topic = "unit-test";
- final byte[] messageKey = null;
- final byte[] demarcatorBytes = null;
-
- final InputStream failureInputStream = new InputStream() {
- @Override
- public int read() throws IOException {
- throw new IOException("Intentional Unit Test Exception");
- }
- };
-
- try {
- lease.publish(flowFile, failureInputStream, messageKey, demarcatorBytes, topic);
- Assert.fail("Expected IOException");
- } catch (final IOException ioe) {
- // expected
- }
-
- assertEquals(1, poisonCount.get());
-
- final PublishResult result = lease.complete();
- assertTrue(result.getFailedFlowFiles().contains(flowFile));
- assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPoisonOnFailure() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger) {
- @Override
- public void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
-
- final FlowFile flowFile = new MockFlowFile(1L);
- final String topic = "unit-test";
- final byte[] messageKey = null;
- final byte[] demarcatorBytes = null;
-
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
- final Callback callback = invocation.getArgumentAt(1, Callback.class);
- callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
- return null;
- }
- }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
-
- lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic);
-
- assertEquals(1, poisonCount.get());
-
- final PublishResult result = lease.complete();
- assertTrue(result.getFailedFlowFiles().contains(flowFile));
- assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testAllDelimitedMessagesSent() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
- @Override
- protected void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
-
- final AtomicInteger correctMessages = new AtomicInteger(0);
- final AtomicInteger incorrectMessages = new AtomicInteger(0);
- doAnswer(new Answer<Object>() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
- final byte[] value = record.value();
- final String valueString = new String(value, StandardCharsets.UTF_8);
- if ("1234567890".equals(valueString)) {
- correctMessages.incrementAndGet();
- } else {
- incorrectMessages.incrementAndGet();
- }
-
- return null;
- }
- }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
-
- final FlowFile flowFile = new MockFlowFile(1L);
- final String topic = "unit-test";
- final byte[] messageKey = null;
- final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
-
- final byte[] flowFileContent = "1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
- lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
-
- final byte[] flowFileContent2 = new byte[0];
- lease.publish(new MockFlowFile(2L), new ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
-
- final byte[] flowFileContent3 = "1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new line
- lease.publish(new MockFlowFile(3L), new ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
-
- final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
- lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
-
- assertEquals(0, poisonCount.get());
-
- verify(producer, times(0)).flush();
-
- final PublishResult result = lease.complete();
- assertTrue(result.getFailedFlowFiles().contains(flowFile));
- assertFalse(result.getSuccessfulFlowFiles().contains(flowFile));
-
- assertEquals(7, correctMessages.get());
- assertEquals(0, incorrectMessages.get());
-
- verify(producer, times(1)).flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
deleted file mode 100644
index 7c70194..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-public class TestPublisherPool {
-
- @Test
- public void testLeaseCloseReturnsToPool() {
- final Map<String, Object> kafkaProperties = new HashMap<>();
- kafkaProperties.put("bootstrap.servers", "localhost:1111");
- kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
- kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
-
- final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
- assertEquals(0, pool.available());
-
- final PublisherLease lease = pool.obtainPublisher();
- assertEquals(0, pool.available());
-
- lease.close();
- assertEquals(1, pool.available());
- }
-
- @Test
- public void testPoisonedLeaseNotReturnedToPool() {
- final Map<String, Object> kafkaProperties = new HashMap<>();
- kafkaProperties.put("bootstrap.servers", "localhost:1111");
- kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
- kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
-
- final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L);
- assertEquals(0, pool.available());
-
- final PublisherLease lease = pool.obtainPublisher();
- assertEquals(0, pool.available());
-
- lease.poison();
- lease.close();
- assertEquals(0, pool.available());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
new file mode 100644
index 0000000..0ed00fb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.producer.KeyedMessage;
+import kafka.producer.OldProducer;
+
+/**
+ * Helper class which helps to produce events targeting {@link EmbeddedKafka}
+ * server.
+ */
+public class EmbeddedKafkaProducerHelper implements Closeable {
+
+ private final EmbeddedKafka kafkaServer;
+
+ private final OldProducer producer;
+
+ /**
+ * Will create an instance of EmbeddedKafkaProducerHelper based on default
+ * configurations.<br>
+ * Default configuration includes:<br>
+ * <i>
+ * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
+ * serializer.class=kafka.serializer.DefaultEncoder<br>
+ * key.serializer.class=kafka.serializer.DefaultEncoder<br>
+ * auto.create.topics.enable=true
+ * </i><br>
+ * <br>
+ * If you wish to supply additional configuration properties or override
+ * existing use
+ * {@link EmbeddedKafkaProducerHelper#EmbeddedKafkaProducerHelper(EmbeddedKafka, Properties)}
+ * constructor.
+ *
+ * @param kafkaServer
+ * instance of {@link EmbeddedKafka}
+ */
+ public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer) {
+ this(kafkaServer, null);
+ }
+
+ /**
+ * Will create an instance of EmbeddedKafkaProducerHelper based on default
+ * configurations and additional configuration properties.<br>
+ * Default configuration includes:<br>
+ * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
+ * serializer.class=kafka.serializer.DefaultEncoder<br>
+ * key.serializer.class=kafka.serializer.DefaultEncoder<br>
+ * auto.create.topics.enable=true<br>
+ * <br>
+ *
+ * @param kafkaServer
+ * instance of {@link EmbeddedKafka}
+ * @param additionalProperties
+ * instance of {@link Properties} specifying additional producer
+ * configuration properties.
+ */
+ public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer, Properties additionalProperties) {
+ this.kafkaServer = kafkaServer;
+ Properties producerProperties = new Properties();
+ producerProperties.put("metadata.broker.list", "localhost:" + this.kafkaServer.getKafkaPort());
+ producerProperties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+ producerProperties.put("key.serializer.class", "kafka.serializer.DefaultEncoder");
+ producerProperties.put("auto.create.topics.enable", "true");
+ if (additionalProperties != null) {
+ producerProperties.putAll(additionalProperties);
+ }
+ this.producer = new OldProducer(producerProperties);
+ }
+
+ /**
+ * Will send an event to a Kafka topic. If topic doesn't exist it will be
+ * auto-created.
+ *
+ * @param topicName
+ * Kafka topic name.
+ * @param event
+ * string representing an event(message) to be sent to Kafka.
+ */
+ public void sendEvent(String topicName, String event) {
+ KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], byte[]>(topicName, event.getBytes());
+ this.producer.send(data.topic(), data.key(), data.message());
+ }
+
+ /**
+ * Will close the underlying Kafka producer.
+ */
+ @Override
+ public void close() throws IOException {
+ this.producer.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index e524589..5bc0e0e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.kafka;
import java.io.Closeable;
-import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
@@ -112,9 +111,8 @@ class KafkaPublisher implements Closeable {
* instance of {@link PublishingContext} which hold context
* information about the message(s) to be sent.
* @return The index of the last successful offset.
- * @throws IOException if unable to read from the Input Stream
*/
- KafkaPublisherResult publish(PublishingContext publishingContext) throws IOException {
+ KafkaPublisherResult publish(PublishingContext publishingContext) {
StreamDemarcator streamTokenizer = new StreamDemarcator(publishingContext.getContentStream(),
publishingContext.getDelimiterBytes(), publishingContext.getMaxRequestSize());
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 64fdb1d..5792e64 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -223,8 +223,7 @@ public class ConsumeKafka extends AbstractProcessor {
final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
: null;
-
- final Map<String, Object> props = new HashMap<>();
+ final Map<String, String> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index e13a8c3..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -74,7 +74,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
- final Map<String, Object> kafkaProperties,
+ final Map<String, String> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
final String keyEncoding,
@@ -115,7 +115,6 @@ public class ConsumerPool implements Closeable {
* sitting idle which could prompt excessive rebalances.
*/
lease = new SimpleConsumerLease(consumer);
-
/**
* This subscription tightly couples the lease to the given
* consumer. They cannot be separated from then on.
@@ -149,7 +148,7 @@ public class ConsumerPool implements Closeable {
});
}
- private void closeConsumer(final Consumer<?, ?> consumer) {
+ private void closeConsumer(final Consumer consumer) {
consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
deleted file mode 100644
index e7d5cb7..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.flowfile.FlowFile;
-
-public class InFlightMessageTracker {
- private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
- private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
- private final Object progressMutex = new Object();
-
- public void incrementAcknowledgedCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
- counter.incrementAcknowledgedCount();
-
- synchronized (progressMutex) {
- progressMutex.notify();
- }
- }
-
- public int getAcknowledgedCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.get(flowFile);
- return (counter == null) ? 0 : counter.getAcknowledgedCount();
- }
-
- public void incrementSentCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
- counter.incrementSentCount();
- }
-
- public int getSentCount(final FlowFile flowFile) {
- final Counts counter = messageCountsByFlowFile.get(flowFile);
- return (counter == null) ? 0 : counter.getSentCount();
- }
-
- public void fail(final FlowFile flowFile, final Exception exception) {
- failures.putIfAbsent(flowFile, exception);
-
- synchronized (progressMutex) {
- progressMutex.notify();
- }
- }
-
- public Exception getFailure(final FlowFile flowFile) {
- return failures.get(flowFile);
- }
-
- public boolean isFailed(final FlowFile flowFile) {
- return getFailure(flowFile) != null;
- }
-
- public void reset() {
- messageCountsByFlowFile.clear();
- failures.clear();
- }
-
- public PublishResult failOutstanding(final Exception exception) {
- messageCountsByFlowFile.keySet().stream()
- .filter(ff -> !isComplete(ff))
- .filter(ff -> !failures.containsKey(ff))
- .forEach(ff -> failures.put(ff, exception));
-
- return createPublishResult();
- }
-
- private boolean isComplete(final FlowFile flowFile) {
- final Counts counts = messageCountsByFlowFile.get(flowFile);
- if (counts.getAcknowledgedCount() == counts.getSentCount()) {
- // all messages received successfully.
- return true;
- }
-
- if (failures.containsKey(flowFile)) {
- // FlowFile failed so is complete
- return true;
- }
-
- return false;
- }
-
- private boolean isComplete() {
- return messageCountsByFlowFile.keySet().stream()
- .allMatch(flowFile -> isComplete(flowFile));
- }
-
- void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
- final long startTime = System.nanoTime();
- final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
-
- while (System.nanoTime() < maxTime) {
- synchronized (progressMutex) {
- if (isComplete()) {
- return;
- }
-
- progressMutex.wait(millis);
- }
- }
-
- throw new TimeoutException();
- }
-
-
- PublishResult createPublishResult() {
- return new PublishResult() {
- @Override
- public Collection<FlowFile> getSuccessfulFlowFiles() {
- if (failures.isEmpty()) {
- return messageCountsByFlowFile.keySet();
- }
-
- final Set<FlowFile> flowFiles = new HashSet<>(messageCountsByFlowFile.keySet());
- flowFiles.removeAll(failures.keySet());
- return flowFiles;
- }
-
- @Override
- public Collection<FlowFile> getFailedFlowFiles() {
- return failures.keySet();
- }
-
- @Override
- public int getSuccessfulMessageCount(final FlowFile flowFile) {
- return getAcknowledgedCount(flowFile);
- }
-
- @Override
- public Exception getReasonForFailure(final FlowFile flowFile) {
- return getFailure(flowFile);
- }
- };
- }
-
- public static class Counts {
- private final AtomicInteger sentCount = new AtomicInteger(0);
- private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
-
- public void incrementSentCount() {
- sentCount.incrementAndGet();
- }
-
- public void incrementAcknowledgedCount() {
- acknowledgedCount.incrementAndGet();
- }
-
- public int getAcknowledgedCount() {
- return acknowledgedCount.get();
- }
-
- public int getSentCount() {
- return sentCount.get();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a4ed6221/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 3d09f2d..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -27,9 +27,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
-import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.CommonClientConfigs;
+
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
@@ -187,7 +186,7 @@ final class KafkaProcessorUtils {
final Class<?> classType;
- public KafkaConfigValidator(final Class<?> classType) {
+ public KafkaConfigValidator(final Class classType) {
this.classType = classType;
}
@@ -212,8 +211,7 @@ final class KafkaProcessorUtils {
return builder.toString();
}
-
- static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
+ static void buildCommonKafkaProperties(final ProcessContext context, final Class kafkaConfigClass, final Map<String, String> mapToPopulate) {
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
// Translate SSLContext Service configuration into Kafka properties
@@ -232,33 +230,28 @@ final class KafkaProcessorUtils {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
-
- String propertyName = propertyDescriptor.getName();
- String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
+ String pName = propertyDescriptor.getName();
+ String pValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
-
- if (propertyValue != null) {
- // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
- // or the standard NiFi time period such as "5 secs"
- if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
- propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
+ if (pValue != null) {
+ if (pName.endsWith(".ms")) { // kafka standard time notation
+ pValue = String.valueOf(FormatUtils.getTimeDuration(pValue.trim(), TimeUnit.MILLISECONDS));
}
-
- if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
- mapToPopulate.put(propertyName, propertyValue);
+ if (isStaticStringFieldNamePresent(pName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+ mapToPopulate.put(pName, pValue);
}
}
}
}
- private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
+ private static boolean isStaticStringFieldNamePresent(final String name, final Class... classes) {
return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
}
- private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
+ private static Set<String> getPublicStaticStringFieldValues(final Class... classes) {
final Set<String> strings = new HashSet<>();
- for (final Class<?> classType : classes) {
+ for (final Class classType : classes) {
for (final Field field : classType.getDeclaredFields()) {
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
try {