You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jp...@apache.org on 2016/12/15 18:51:46 UTC
[07/20] nifi git commit: NIFI-3198: Refactored how PublishKafka and
PublishKafka_0_10 work to improve throughput and resilience. Fixed bug in
StreamDemarcator. Slight refactoring of consume processors to simplify code.
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7e4b12c..8e3fa3b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,105 +16,36 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class ConsumeKafkaTest {
- static class MockConsumerPool extends ConsumerPool {
-
- final int actualMaxLeases;
- final List<String> actualTopics;
- final Map<String, String> actualKafkaProperties;
- boolean throwKafkaExceptionOnPoll = false;
- boolean throwKafkaExceptionOnCommit = false;
- Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
- Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
- Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
- boolean wasConsumerLeasePoisoned = false;
- boolean wasConsumerLeaseClosed = false;
- boolean wasPoolClosed = false;
-
- public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
- super(maxLeases, topics, kafkaProperties, null);
- actualMaxLeases = maxLeases;
- actualTopics = topics;
- actualKafkaProperties = kafkaProperties;
- }
-
- @Override
- public ConsumerLease obtainConsumer() {
- return new ConsumerLease() {
- @Override
- public ConsumerRecords<byte[], byte[]> poll() {
- if (throwKafkaExceptionOnPoll) {
- throw new KafkaException("i planned to fail");
- }
- final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
- return (records == null) ? ConsumerRecords.empty() : records;
- }
-
- @Override
- public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
- if (throwKafkaExceptionOnCommit) {
- throw new KafkaException("i planned to fail");
- }
- actualCommitOffsets = offsets;
- }
-
- @Override
- public void poison() {
- wasConsumerLeasePoisoned = true;
- }
-
- @Override
- public void close() {
- wasConsumerLeaseClosed = true;
- }
- };
- }
-
- @Override
- public void close() {
- wasPoolClosed = true;
- }
-
- void resetState() {
- throwKafkaExceptionOnPoll = false;
- throwKafkaExceptionOnCommit = false;
- nextPlannedRecordsQueue = null;
- nextExpectedCommitOffsets = null;
- wasConsumerLeasePoisoned = false;
- wasConsumerLeaseClosed = false;
- wasPoolClosed = false;
- }
+ Consumer<byte[], byte[]> mockConsumer = null;
+ ConsumerLease mockLease = null;
+ ConsumerPool mockConsumerPool = null;
+ @Before
+ public void setup() {
+ mockConsumer = mock(Consumer.class);
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
}
@Test
@@ -175,365 +106,45 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- } else {
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- }
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- }
-
- @Test
- public void validateGetLotsOfMessages() throws Exception {
- String groupName = "validateGetLotsOfMessages";
-
- final byte[][] firstPassValues = new byte[10010][1];
- for (final byte[] value : firstPassValues) {
- value[0] = 0x12;
- }
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
- assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
- assertEquals(1, mockPool.actualCommitOffsets.size());
- assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
- for (final byte[] rawRecord : rawRecords) {
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
-
- for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
- final byte[] key = entry.getKey();
- final byte[] rawRecord = entry.getValue();
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- for (final ConsumerRecords<byte[], byte[]> rec : records) {
- rec.partitions().stream().forEach((part) -> {
- final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
- if (map.get(part) != null) {
- throw new IllegalStateException("already have that topic/partition in the record map");
- }
- map.put(part, conRecs);
- });
- }
- return new ConsumerRecords<>(map);
- }
-
- @Test
- public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
- String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues),
- createConsumerRecords("bar", 1, 1L, secondPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka proc = new ConsumeKafka() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
-
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(2, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- }
-
- @Test
- public void validatePollException() throws Exception {
- String groupName = "validatePollException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnPoll = true;
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(0, flowFiles.size());
- assertNull(null, mockPool.actualCommitOffsets);
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
-
- @Test
- public void validateCommitOffsetException() throws Exception {
- String groupName = "validateCommitOffsetException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnCommit = true;
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(1, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertNull(null, mockPool.actualCommitOffsets);
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
@Test
- public void validateUtf8Key() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
+ public void validateGetErrorMessages() throws Exception {
+ String groupName = "validateGetErrorMessages";
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(true, false);
+ when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka proc = new ConsumeKafka() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -542,89 +153,15 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(2)).continuePolling();
+ verify(mockLease, times(1)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
- @Test
- public void validateHexKey() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
-
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
+ ProcessSession mockSession = null;
+ ProvenanceReporter mockReporter = null;
+ ConsumerPool testPool = null;
+ ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
@Before
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
- }
-
- @Test
- public void validatePoolSimpleCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+ mockSession = mock(ProcessSession.class);
+ mockReporter = mock(ProvenanceReporter.class);
+ when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+ testPool = new ConsumerPool(
+ 1,
+ null,
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
+ testDemarcatedPool = new ConsumerPool(
+ 1,
+ "--demarcator--".getBytes(StandardCharsets.UTF_8),
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
+ @Override
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ return consumer;
+ }
+ };
+ }
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ @Test
+ public void validatePoolSimpleCreateClose() throws Exception {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
- lease.commitOffsets(Collections.emptyMap());
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
- assertEquals(1, stats.leasesObtainedCount);
- assertEquals(1, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
+ assertEquals(4, stats.leasesObtainedCount);
}
@Test
- public void validatePoolSimpleBatchCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolSimpleCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ lease.commit();
+ }
+ testPool.close();
+ verify(mockSession, times(3)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+ @Test
+ public void validatePoolSimpleBatchCreateClose() throws Exception {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
- lease.commitOffsets(Collections.emptyMap());
}
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(100, stats.leasesObtainedCount);
- assertEquals(10000, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
@Test
- public void validatePoolConsumerFails() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolBatchCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
lease.poll();
- fail();
- } catch (final KafkaException ke) {
+ lease.commit();
+ }
+ testDemarcatedPool.close();
+ verify(mockSession, times(1)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testDemarcatedPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @Test
+ public void validatePoolConsumerFails() throws Exception {
+
+ when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ try {
+ lease.poll();
+ fail();
+ } catch (final KafkaException ke) {
+ }
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
- assertEquals(0, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+ final TopicPartition tPart = new TopicPartition(topic, partition);
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ long offset = startingOffset;
+ for (final byte[] rawRecord : rawRecords) {
+ final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+ records.add(rec);
+ }
+ map.put(tPart, records);
+ return new ConsumerRecords(map);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
deleted file mode 100644
index 19c64af..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import org.apache.kafka.clients.producer.ProducerConfig;
-
-public class KafkaPublisherTest {
-
- private static EmbeddedKafka kafkaLocal;
-
- private static EmbeddedKafkaProducerHelper producerHelper;
-
- @BeforeClass
- public static void beforeClass() {
- kafkaLocal = new EmbeddedKafka();
- kafkaLocal.start();
- producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- producerHelper.close();
- kafkaLocal.stop();
- }
-
- @Test
- public void validateSuccessfulSendAsWhole() throws Exception {
- InputStream contentStream = new ByteArrayInputStream("Hello Kafka".getBytes(StandardCharsets.UTF_8));
- String topicName = "validateSuccessfulSendAsWhole";
-
- Properties kafkaProperties = this.buildProducerProperties();
- KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
- PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
- KafkaPublisherResult result = publisher.publish(publishingContext);
-
- assertEquals(0, result.getLastMessageAcked());
- assertEquals(1, result.getMessagesSent());
- contentStream.close();
- publisher.close();
-
- ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
- assertNotNull(iter.next());
- try {
- iter.next();
- } catch (ConsumerTimeoutException e) {
- // that's OK since this is the Kafka mechanism to unblock
- }
- }
-
- @Test
- public void validateSuccessfulSendAsDelimited() throws Exception {
- InputStream contentStream = new ByteArrayInputStream(
- "Hello Kafka\nHello Kafka\nHello Kafka\nHello Kafka\n".getBytes(StandardCharsets.UTF_8));
- String topicName = "validateSuccessfulSendAsDelimited";
-
- Properties kafkaProperties = this.buildProducerProperties();
- KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
- PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
- publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
- KafkaPublisherResult result = publisher.publish(publishingContext);
-
- assertEquals(3, result.getLastMessageAcked());
- assertEquals(4, result.getMessagesSent());
- contentStream.close();
- publisher.close();
-
- ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
- assertNotNull(iter.next());
- assertNotNull(iter.next());
- assertNotNull(iter.next());
- assertNotNull(iter.next());
- try {
- iter.next();
- fail();
- } catch (ConsumerTimeoutException e) {
- // that's OK since this is the Kafka mechanism to unblock
- }
- }
-
- /*
- * This test simulates the condition where not all messages were ACKed by
- * Kafka
- */
- @Test
- public void validateRetries() throws Exception {
- byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
- InputStream contentStream = new ByteArrayInputStream(testValue);
- String topicName = "validateSuccessfulReSendOfFailedSegments";
-
- Properties kafkaProperties = this.buildProducerProperties();
-
- KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
- // simulates the first re-try
- int lastAckedMessageIndex = 1;
- PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
- publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
- publisher.publish(publishingContext);
-
- ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
- String m1 = new String(iter.next().message());
- String m2 = new String(iter.next().message());
- assertEquals("Hello Kafka3", m1);
- assertEquals("Hello Kafka4", m2);
- try {
- iter.next();
- fail();
- } catch (ConsumerTimeoutException e) {
- // that's OK since this is the Kafka mechanism to unblock
- }
-
- // simulates the second re-try
- lastAckedMessageIndex = 2;
- contentStream = new ByteArrayInputStream(testValue);
- publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
- publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
- publisher.publish(publishingContext);
-
- m1 = new String(iter.next().message());
- assertEquals("Hello Kafka4", m1);
-
- publisher.close();
- }
-
- /*
- * Similar to the above test, but it sets the first retry index to the last
- * possible message index and second index to an out of bound index. The
- * expectation is that no messages will be sent to Kafka
- */
- @Test
- public void validateRetriesWithWrongIndex() throws Exception {
- byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello Kafka4\n".getBytes(StandardCharsets.UTF_8);
- InputStream contentStream = new ByteArrayInputStream(testValue);
- String topicName = "validateRetriesWithWrongIndex";
-
- Properties kafkaProperties = this.buildProducerProperties();
-
- KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
-
- // simulates the first re-try
- int lastAckedMessageIndex = 3;
- PublishingContext publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
- publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
- publisher.publish(publishingContext);
-
- ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
- try {
- iter.next();
- fail();
- } catch (ConsumerTimeoutException e) {
- // that's OK since this is the Kafka mechanism to unblock
- }
-
- // simulates the second re-try
- lastAckedMessageIndex = 6;
- contentStream = new ByteArrayInputStream(testValue);
- publishingContext = new PublishingContext(contentStream, topicName, lastAckedMessageIndex);
- publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
- publisher.publish(publishingContext);
- try {
- iter.next();
- fail();
- } catch (ConsumerTimeoutException e) {
- // that's OK since this is the Kafka mechanism to unblock
- }
-
- publisher.close();
- }
-
- @Test
- public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
- String data = "\u50e0THIS IS MY NEW TEXT.\u50e0IT HAS A NEWLINE.";
- InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
- String topicName = "validateWithMultiByteCharacters";
-
- Properties kafkaProperties = this.buildProducerProperties();
-
- KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
- PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
-
- publisher.publish(publishingContext);
- publisher.close();
-
- ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
- String r = new String(iter.next().message(), StandardCharsets.UTF_8);
- assertEquals(data, r);
- }
-
- @Test
- public void validateWithNonDefaultPartitioner() throws Exception {
- String data = "fooandbarandbaz";
- InputStream contentStream = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
- String topicName = "validateWithNonDefaultPartitioner";
-
- Properties kafkaProperties = this.buildProducerProperties();
- kafkaProperties.setProperty("partitioner.class", TestPartitioner.class.getName());
- KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, mock(ComponentLog.class));
- PublishingContext publishingContext = new PublishingContext(contentStream, topicName);
- publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
-
- try {
- publisher.publish(publishingContext);
- // partitioner should be invoked 3 times
- assertTrue(TestPartitioner.counter == 3);
- publisher.close();
- } finally {
- TestPartitioner.counter = 0;
- }
- }
-
- private Properties buildProducerProperties() {
- Properties kafkaProperties = new Properties();
- kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- kafkaProperties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaLocal.getKafkaPort());
- kafkaProperties.put("auto.create.topics.enable", "true");
- return kafkaProperties;
- }
-
- private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
- Properties props = new Properties();
- props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort());
- props.put("group.id", "test");
- props.put("consumer.timeout.ms", "500");
- props.put("auto.offset.reset", "smallest");
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
- ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, Integer> topicCountMap = new HashMap<>(1);
- topicCountMap.put(topic, 1);
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
- ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
- return iter;
- }
-
- public static class TestPartitioner implements Partitioner {
-
- static int counter;
-
- @Override
- public void configure(Map<String, ?> configs) {
- // nothing to do, test
- }
-
- @Override
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
- Cluster cluster) {
- counter++;
- return 0;
- }
-
- @Override
- public void close() {
- counter = 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
deleted file mode 100644
index d81f0c1..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.mockito.Mockito;
-import static org.mockito.Mockito.times;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.verify;
-
-public class PublishKafkaTest {
-
- @Test
- public void validateCustomSerilaizerDeserializerSettings() throws Exception {
- PublishKafka publishKafka = new PublishKafka();
- TestRunner runner = TestRunners.newTestRunner(publishKafka);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
- runner.setProperty(PublishKafka.TOPIC, "foo");
- runner.setProperty(PublishKafka.META_WAIT_TIME, "3 sec");
- runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- runner.assertValid();
- runner.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Foo");
- runner.assertNotValid();
- }
-
- @Test
- public void validatePropertiesValidation() throws Exception {
- PublishKafka publishKafka = new PublishKafka();
- TestRunner runner = TestRunners.newTestRunner(publishKafka);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
- runner.setProperty(PublishKafka.TOPIC, "foo");
- runner.setProperty(PublishKafka.META_WAIT_TIME, "foo");
-
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("'max.block.ms' validated against 'foo' is invalid"));
- }
- }
-
- @Test
- public void validateCustomValidation() {
- String topicName = "validateCustomValidation";
- PublishKafka publishKafka = new PublishKafka();
-
- /*
- * Validates that Kerberos principle is required if one of SASL set for
- * secirity protocol
- */
- TestRunner runner = TestRunners.newTestRunner(publishKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
- try {
- runner.run();
- fail();
- } catch (Throwable e) {
- assertTrue(e.getMessage().contains("'Kerberos Service Name' is invalid because"));
- }
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateSingleCharacterDemarcatedMessages() {
- String topicName = "validateSingleCharacterDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-
- runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
- String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
-
- runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
- String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
- StubPublishKafka putKafka = new StubPublishKafka(1);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.PARTITION_CLASS, Partitioners.RoundRobinPartitioner.class.getName());
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
-
- runner.enqueue("Hello WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
- String topicName = "validateSendFailureAndThenResendSuccess";
- StubPublishKafka putKafka = new StubPublishKafka(100);
-
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
- runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");
-
- final String text = "Hello World\nGoodbye\nfail\n2";
- runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- putKafka.destroy();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
- String topicName = "validateSendFailureAndThenResendSuccess";
- StubPublishKafka putKafka = new StubPublishKafka(1);
-
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
- runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
- final String text = "Hello World\nGoodbye\nfail\n2";
- runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- assertEquals(1, runner.getQueueSize().getObjectCount()); // due to failure
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws Exception {
- String topicName = "validateSendFailureAndThenResendSuccess";
- StubPublishKafka putKafka = new StubPublishKafka(100);
-
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
- runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
- final String text = "futurefail\nHello World\nGoodbye\n2";
- runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
- assertNotNull(ff);
- runner.enqueue(ff);
-
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- // 6 sends due to duplication
- verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateOnFutureGetFailureAndThenResendSuccess() throws Exception {
- String topicName = "validateSendFailureAndThenResendSuccess";
- StubPublishKafka putKafka = new StubPublishKafka(100);
-
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
- runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
- final String text = "Hello World\nGoodbye\nfuturefail\n2";
- runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
- MockFlowFile ff = runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
- assertNotNull(ff);
- runner.enqueue(ff);
-
- runner.run(1, false);
- assertEquals(0, runner.getQueueSize().getObjectCount());
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- // 6 sends due to duplication
- verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateDemarcationIntoEmptyMessages() {
- String topicName = "validateDemarcationIntoEmptyMessages";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- final TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(PublishKafka.KEY, "key1");
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-
- final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
- runner.enqueue(bytes);
- runner.run(1);
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateComplexRightPartialDemarcatedMessages() {
- String topicName = "validateComplexRightPartialDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
-
- runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>".getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
-
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateComplexLeftPartialDemarcatedMessages() {
- String topicName = "validateComplexLeftPartialDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
-
- runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0I Mean IT!\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
-
- runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void validateComplexPartialMatchDemarcatedMessages() {
- String topicName = "validateComplexPartialMatchDemarcatedMessages";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0");
-
- runner.enqueue("Hello World\u50e0<\u50e0WILDSTUFF\u50e0>\u50e0Goodbye\u50e0<\u50e0WILDBOOMSTUFF\u50e0>\u50e0".getBytes(StandardCharsets.UTF_8));
- runner.run(1, false);
-
- runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
- Producer<byte[], byte[]> producer = putKafka.getProducer();
- verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
- runner.shutdown();
- }
-
- @Test
- public void validateUtf8Key() {
- String topicName = "validateUtf8Key";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.KEY, "${myKey}");
-
- final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
- runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
- runner.run(1);
-
- runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
- final Map<Object, Object> msgs = putKafka.getMessagesSent();
- assertEquals(1, msgs.size());
- final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
- assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
- }
-
- @Test
- public void validateHexKey() {
- String topicName = "validateUtf8Key";
- StubPublishKafka putKafka = new StubPublishKafka(100);
- TestRunner runner = TestRunners.newTestRunner(putKafka);
- runner.setProperty(PublishKafka.TOPIC, topicName);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka.KEY_ATTRIBUTE_ENCODING, PublishKafka.HEX_ENCODING);
- runner.setProperty(PublishKafka.KEY, "${myKey}");
-
- final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
- runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
- runner.run(1);
-
- runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
- final Map<Object, Object> msgs = putKafka.getMessagesSent();
- assertEquals(1, msgs.size());
- final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
-
- assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
deleted file mode 100644
index 76c29cd..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.Test;
-
-public class PublishingContextTest {
-
- @Test
- public void failInvalidConstructorArgs() {
- try {
- new PublishingContext(null, null);
- fail();
- } catch (IllegalArgumentException e) {
- // success
- }
- try {
- new PublishingContext(mock(InputStream.class), null);
- fail();
- } catch (IllegalArgumentException e) {
- // success
- }
-
- try {
- new PublishingContext(mock(InputStream.class), "");
- fail();
- } catch (IllegalArgumentException e) {
- // success
- }
-
- try {
- new PublishingContext(mock(InputStream.class), "mytopic", -3);
- fail();
- } catch (IllegalArgumentException e) {
- // success
- }
- }
-
- @Test
- public void validateFullSetting() {
- PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic", 3);
- publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
- publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
-
- assertEquals("delimiter", new String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
- assertEquals("key", new String(publishingContext.getKeyBytes(), StandardCharsets.UTF_8));
- assertEquals("topic", publishingContext.getTopic());
- assertEquals("topic: 'topic'; delimiter: 'delimiter'", publishingContext.toString());
- }
-
- @Test
- public void validateOnlyOnceSetPerInstance() {
- PublishingContext publishingContext = new PublishingContext(mock(InputStream.class), "topic");
- publishingContext.setKeyBytes(new byte[]{0});
- try {
- publishingContext.setKeyBytes(new byte[]{0});
- fail();
- } catch (IllegalArgumentException e) {
- // success
- }
-
- publishingContext.setDelimiterBytes(new byte[]{0});
- try {
- publishingContext.setDelimiterBytes(new byte[]{0});
- fail();
- } catch (IllegalArgumentException e) {
- // success
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
deleted file mode 100644
index 533655e..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class StubPublishKafka extends PublishKafka {
-
- private volatile Producer<byte[], byte[]> producer;
-
- private volatile boolean failed;
-
- private final int ackCheckSize;
-
- private final ExecutorService executor = Executors.newCachedThreadPool();
- private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
-
- StubPublishKafka(int ackCheckSize) {
- this.ackCheckSize = ackCheckSize;
- }
-
- public Producer<byte[], byte[]> getProducer() {
- return producer;
- }
-
- public void destroy() {
- this.executor.shutdownNow();
- }
-
- public Map<Object, Object> getMessagesSent() {
- return new HashMap<>(msgsSent);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
- throws ProcessException {
- final Map<String, String> kafkaProperties = new HashMap<>();
- KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
- kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- KafkaPublisher publisher;
- try {
- Field f = PublishKafka.class.getDeclaredField("brokers");
- f.setAccessible(true);
- f.set(this, context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
- publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
- publisher.setAckWaitTime(15000);
- producer = mock(Producer.class);
-
- this.instrumentProducer(producer, false);
- Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
- kf.setAccessible(true);
- kf.set(publisher, producer);
-
- Field componentLogF = KafkaPublisher.class.getDeclaredField("componentLog");
- componentLogF.setAccessible(true);
- componentLogF.set(publisher, mock(ComponentLog.class));
-
- Field ackCheckSizeField = KafkaPublisher.class.getDeclaredField("ackCheckSize");
- ackCheckSizeField.setAccessible(true);
- ackCheckSizeField.set(publisher, this.ackCheckSize);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IllegalStateException(e);
- }
- return publisher;
- }
-
- @SuppressWarnings("unchecked")
- private void instrumentProducer(Producer<byte[], byte[]> producer, boolean failRandomly) {
-
- when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
- @Override
- public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
- final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
- if (record != null && record.key() != null) {
- msgsSent.put(record.key(), record.value());
- }
-
- String value = new String(record.value(), StandardCharsets.UTF_8);
- if ("fail".equals(value) && !StubPublishKafka.this.failed) {
- StubPublishKafka.this.failed = true;
- throw new RuntimeException("intentional");
- }
- Future<RecordMetadata> future = executor.submit(new Callable<RecordMetadata>() {
- @Override
- public RecordMetadata call() throws Exception {
- if ("futurefail".equals(value) && !StubPublishKafka.this.failed) {
- StubPublishKafka.this.failed = true;
- throw new TopicAuthorizationException("Unauthorized");
- } else {
- TopicPartition partition = new TopicPartition("foo", 0);
- RecordMetadata meta = new RecordMetadata(partition, 0, 0);
- return meta;
- }
- }
- });
- return future;
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ed17df50/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
new file mode 100644
index 0000000..e54a10c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestInFlightMessageTracker {
+
+ @Test(timeout = 5000L)
+ public void testAwaitCompletionWhenComplete() throws InterruptedException, TimeoutException {
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+
+ final InFlightMessageTracker tracker = new InFlightMessageTracker();
+ tracker.incrementSentCount(flowFile);
+
+ verifyNotComplete(tracker);
+
+ tracker.incrementSentCount(flowFile);
+ verifyNotComplete(tracker);
+
+ tracker.incrementAcknowledgedCount(flowFile);
+ verifyNotComplete(tracker);
+
+ tracker.incrementAcknowledgedCount(flowFile);
+ tracker.awaitCompletion(1L);
+ }
+
+ @Test(timeout = 5000L)
+ public void testAwaitCompletionWhileWaiting() throws InterruptedException, ExecutionException {
+ final MockFlowFile flowFile = new MockFlowFile(1L);
+
+ final InFlightMessageTracker tracker = new InFlightMessageTracker();
+ tracker.incrementSentCount(flowFile);
+
+ verifyNotComplete(tracker);
+
+ tracker.incrementSentCount(flowFile);
+ verifyNotComplete(tracker);
+
+ final ExecutorService exec = Executors.newFixedThreadPool(1);
+ final Future<?> future = exec.submit(() -> {
+ try {
+ tracker.awaitCompletion(10000L);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ tracker.incrementAcknowledgedCount(flowFile);
+ tracker.incrementAcknowledgedCount(flowFile);
+
+ future.get();
+ }
+
+ private void verifyNotComplete(final InFlightMessageTracker tracker) throws InterruptedException {
+ try {
+ tracker.awaitCompletion(10L);
+ Assert.fail("Expected timeout");
+ } catch (final TimeoutException te) {
+ // expected
+ }
+ }
+
+}