You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/09/06 19:49:11 UTC
[2/3] nifi git commit: NIFI-2732 ensure session and consumer aligned
and has registered rebalance listener. Make consumption far more memory and
process efficient, fixed extraneous getbundled call
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index a85563d..6fd9053 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,104 +16,36 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class ConsumeKafkaTest {
- static class MockConsumerPool extends ConsumerPool {
-
- final int actualMaxLeases;
- final List<String> actualTopics;
- final Map<String, String> actualKafkaProperties;
- boolean throwKafkaExceptionOnPoll = false;
- boolean throwKafkaExceptionOnCommit = false;
- Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
- Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
- Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
- boolean wasConsumerLeasePoisoned = false;
- boolean wasConsumerLeaseClosed = false;
- boolean wasPoolClosed = false;
-
- public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
- super(maxLeases, topics, kafkaProperties, null);
- actualMaxLeases = maxLeases;
- actualTopics = topics;
- actualKafkaProperties = kafkaProperties;
- }
-
- @Override
- public ConsumerLease obtainConsumer() {
- return new ConsumerLease() {
- @Override
- public ConsumerRecords<byte[], byte[]> poll() {
- if (throwKafkaExceptionOnPoll) {
- throw new KafkaException("i planned to fail");
- }
- final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
- return (records == null) ? ConsumerRecords.empty() : records;
- }
-
- @Override
- public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
- if (throwKafkaExceptionOnCommit) {
- throw new KafkaException("i planned to fail");
- }
- actualCommitOffsets = offsets;
- }
-
- @Override
- public void poison() {
- wasConsumerLeasePoisoned = true;
- }
-
- @Override
- public void close() {
- wasConsumerLeaseClosed = true;
- }
- };
- }
-
- @Override
- public void close() {
- wasPoolClosed = true;
- }
-
- void resetState() {
- throwKafkaExceptionOnPoll = false;
- throwKafkaExceptionOnCommit = false;
- nextPlannedRecordsQueue = null;
- nextExpectedCommitOffsets = null;
- wasConsumerLeasePoisoned = false;
- wasConsumerLeaseClosed = false;
- wasPoolClosed = false;
- }
+ Consumer<byte[], byte[]> mockConsumer = null;
+ ConsumerLease mockLease = null;
+ ConsumerPool mockConsumerPool = null;
+ @Before
+ public void setup() {
+ mockConsumer = mock(Consumer.class);
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
}
@Test
@@ -174,31 +106,14 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -207,69 +122,29 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- } else {
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- }
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
@Test
- public void validateGetLotsOfMessages() throws Exception {
- String groupName = "validateGetLotsOfMessages";
-
- final byte[][] firstPassValues = new byte[10010][1];
- for (final byte[] value : firstPassValues) {
- value[0] = 0x12;
- }
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+ public void validateGetErrorMessages() throws Exception {
+ String groupName = "validateGetErrorMessages";
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(true, false);
+ when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -278,352 +153,15 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
- assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
- assertEquals(1, mockPool.actualCommitOffsets.size());
- assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(2)).continuePolling();
+ verify(mockLease, times(1)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
- for (final byte[] rawRecord : rawRecords) {
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
-
- for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
- final byte[] key = entry.getKey();
- final byte[] rawRecord = entry.getValue();
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- for (final ConsumerRecords<byte[], byte[]> rec : records) {
- rec.partitions().stream().forEach((part) -> {
- final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
- if (map.get(part) != null) {
- throw new IllegalStateException("already have that topic/partition in the record map");
- }
- map.put(part, conRecs);
- });
- }
- return new ConsumerRecords<>(map);
- }
-
- @Test
- public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
- String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues),
- createConsumerRecords("bar", 1, 1L, secondPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(2, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- }
-
- @Test
- public void validatePollException() throws Exception {
- String groupName = "validatePollException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnPoll = true;
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(0, flowFiles.size());
- assertNull(null, mockPool.actualCommitOffsets);
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
-
- @Test
- public void validateCommitOffsetException() throws Exception {
- String groupName = "validateCommitOffsetException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnCommit = true;
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(1, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertNull(null, mockPool.actualCommitOffsets);
- }
-
- @Test
- public void validateUtf8Key() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
-
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
-
- @Test
- public void validateHexKey() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
-
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
+ ProcessSession mockSession = null;
+ ProvenanceReporter mockReporter = null;
+ ConsumerPool testPool = null;
+ ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
@Before
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
- }
-
- @Test
- public void validatePoolSimpleCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+ mockSession = mock(ProcessSession.class);
+ mockReporter = mock(ProvenanceReporter.class);
+ when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+ testPool = new ConsumerPool(
+ 1,
+ null,
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
+ testDemarcatedPool = new ConsumerPool(
+ 1,
+ "--demarcator--".getBytes(StandardCharsets.UTF_8),
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
+ @Override
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ return consumer;
+ }
+ };
+ }
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ @Test
+ public void validatePoolSimpleCreateClose() throws Exception {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
- lease.commitOffsets(Collections.emptyMap());
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
- assertEquals(1, stats.leasesObtainedCount);
- assertEquals(1, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
+ assertEquals(4, stats.leasesObtainedCount);
}
@Test
- public void validatePoolSimpleBatchCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolSimpleCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ lease.commit();
+ }
+ testPool.close();
+ verify(mockSession, times(3)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+ @Test
+ public void validatePoolSimpleBatchCreateClose() throws Exception {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
- lease.commitOffsets(Collections.emptyMap());
}
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(100, stats.leasesObtainedCount);
- assertEquals(10000, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
@Test
- public void validatePoolConsumerFails() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolBatchCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
lease.poll();
- fail();
- } catch (final KafkaException ke) {
+ lease.commit();
+ }
+ testDemarcatedPool.close();
+ verify(mockSession, times(1)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testDemarcatedPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @Test
+ public void validatePoolConsumerFails() throws Exception {
+
+ when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ try {
+ lease.poll();
+ fail();
+ } catch (final KafkaException ke) {
+ }
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
- assertEquals(0, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+ final TopicPartition tPart = new TopicPartition(topic, partition);
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ long offset = startingOffset;
+ for (final byte[] rawRecord : rawRecords) {
+ final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+ records.add(rec);
+ }
+ map.put(tPart, records);
+ return new ConsumerRecords(map);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index af0d343..b3f1bd1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -358,7 +358,7 @@ public class PublishKafkaTest {
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
+ runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 0a3fe5d..0b8a752 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -25,13 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -53,17 +46,18 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API. "
+ " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time"
+ " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
-@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9"})
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
- + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+ + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
public class ConsumeKafka extends AbstractProcessor {
- private static final long TWO_MB = 2L * 1024L * 1024L;
-
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
- static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
- static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
- "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
@@ -136,6 +124,7 @@ public class ConsumeKafka extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
+
static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
@@ -145,6 +134,20 @@ public class ConsumeKafka extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+ .name("max-uncommit-offset-wait")
+ .displayName("Max Uncommitted Time")
+ .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ + "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ + "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ + "than when we're not as there is much less for us to keep track of in memory.")
+ .required(false)
+ .defaultValue("1 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,7 +156,6 @@ public class ConsumeKafka extends AbstractProcessor {
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
- private volatile byte[] demarcatorBytes = null;
private volatile ConsumerPool consumerPool = null;
static {
@@ -165,6 +167,7 @@ public class ConsumeKafka extends AbstractProcessor {
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS);
+ descriptors.add(MAX_UNCOMMITTED_TIME);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
}
@@ -179,16 +182,8 @@ public class ConsumeKafka extends AbstractProcessor {
return DESCRIPTORS;
}
- @OnScheduled
- public void prepareProcessing(final ProcessContext context) {
- this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
- ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
- : null;
- }
-
@OnStopped
public void close() {
- demarcatorBytes = null;
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
@@ -215,9 +210,21 @@ public class ConsumeKafka extends AbstractProcessor {
return pool;
}
+ return consumerPool = createConsumerPool(context, getLogger());
+ }
+
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ final int maxLeases = context.getMaxConcurrentTasks();
+ final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+ final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
+ ? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+ : null;
final Map<String, String> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
- final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ final String topicListing = context.getProperty(ConsumeKafka.TOPICS).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -225,213 +232,40 @@ public class ConsumeKafka extends AbstractProcessor {
topics.add(trimmedName);
}
}
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
- }
+ final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+ final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return new ConsumerPool(maxLeases, topics, props, log);
+ return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- final long startTimeNanos = System.nanoTime();
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
- try (final ConsumerLease lease = pool.obtainConsumer()) {
- try {
- if (lease == null) {
- context.yield();
- return;
- }
- final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
- if (!foundData) {
- session.rollback();
- return;
- }
-
- writeSessionData(context, session, partitionRecordMap, startTimeNanos);
- //At-least once commit handling (if order is reversed it is at-most once)
- session.commit();
- commitOffsets(lease, partitionRecordMap);
- } catch (final KafkaException ke) {
- lease.poison();
- getLogger().error("Problem while accessing kafka consumer " + ke, ke);
+ try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+ if (lease == null) {
context.yield();
- session.rollback();
- }
- }
- }
-
- private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
- final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .forEach((entry) -> {
- long maxOffset = entry.getValue().stream()
- .mapToLong(record -> record.offset())
- .max()
- .getAsLong();
- partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
- });
- lease.commitOffsets(partOffsetMap);
- }
-
- private void writeSessionData(
- final ProcessContext context, final ProcessSession session,
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
- final long startTimeNanos) {
- if (demarcatorBytes != null) {
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .forEach(entry -> {
- writeData(context, session, entry.getValue(), startTimeNanos);
- });
- } else {
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .flatMap(entry -> entry.getValue().stream())
- .forEach(record -> {
- writeData(context, session, Collections.singletonList(record), startTimeNanos);
- });
- }
- }
-
- private String encodeKafkaKey(final byte[] key, final String encoding) {
- if (key == null) {
- return null;
- }
-
- if (HEX_ENCODING.getValue().equals(encoding)) {
- return DatatypeConverter.printHexBinary(key);
- } else if (UTF8_ENCODING.getValue().equals(encoding)) {
- return new String(key, StandardCharsets.UTF_8);
- } else {
- return null; // won't happen because it is guaranteed by the Allowable Values
- }
- }
-
- private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
- final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
- final String offset = String.valueOf(firstRecord.offset());
- final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
- final String topic = firstRecord.topic();
- final String partition = String.valueOf(firstRecord.partition());
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, out -> {
- boolean useDemarcator = false;
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- if (useDemarcator) {
- out.write(demarcatorBytes);
- }
- out.write(record.value());
- useDemarcator = true;
+ return;
}
- });
- final Map<String, String> kafkaAttrs = new HashMap<>();
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
- if (keyValue != null && records.size() == 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
- }
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
- if (records.size() > 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
- }
- flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
- final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
- final String transitUri = KafkaProcessorUtils.buildTransitURI(
- context.getProperty(SECURITY_PROTOCOL).getValue(),
- context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
- topic);
- session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
- this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
- new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
- session.transfer(flowFile, REL_SUCCESS);
- }
-
- /**
- * Populates the given partitionRecordMap with new records until we poll
- * that returns no records or until we have enough data. It is important to
- * ensure we keep items grouped by their topic and partition so that when we
- * bundle them we bundle them intelligently and so that we can set offsets
- * properly even across multiple poll calls.
- */
- private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
- final long startNanos = System.nanoTime();
- boolean foundData = false;
- ConsumerRecords<byte[], byte[]> records;
- final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
-
- do {
- records = lease.poll();
-
- for (final TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
- if (currList == null) {
- currList = new ArrayList<>();
- partitionRecordMap.put(partition, currList);
+ try {
+ while (this.isScheduled() && lease.continuePolling()) {
+ lease.poll();
}
- currList.addAll(records.records(partition));
- if (currList.size() > 0) {
- foundData = true;
+ if (this.isScheduled() && !lease.commit()) {
+ context.yield();
}
+ } catch (final KafkaException kex) {
+ getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+ new Object[]{lease, kex}, kex);
+ } catch (final Throwable t) {
+ getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+ new Object[]{lease, t}, t);
}
- //If we received data and we still want to get more
- } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
- return foundData;
- }
-
- /**
- * Determines if we have enough data as-is and should move on.
- *
- * @return true if we've been gathering for more than 500 ms or if we're
- * demarcating and have more than 50 flowfiles worth or if we're per message
- * and have more than 2000 flowfiles or if totalMessageSize is greater than
- * two megabytes; false otherwise
- *
- * Implementation note: 500 millis and 5 MB are magic numbers. These may
- * need to be tuned. They get at how often offsets will get committed to
- * kafka relative to how many records will get buffered into memory in a
- * poll call before writing to repos.
- */
- private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long maxRecords, final long startTimeNanos) {
-
- final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
- if (durationMillis > 500) {
- return true;
- }
-
- int topicPartitionsFilled = 0;
- int totalRecords = 0;
- long totalRecordSize = 0;
-
- for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
- if (!recordList.isEmpty()) {
- topicPartitionsFilled++;
- }
- totalRecords += recordList.size();
- for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
- totalRecordSize += rec.value().length;
- }
- }
-
- if (demarcatorBytes != null && demarcatorBytes.length > 0) {
- return topicPartitionsFilled > 50;
- } else if (totalRecordSize > TWO_MB) {
- return true;
- } else {
- return totalRecords > maxRecords;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..5b8ba1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,27 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
* the lease will be returned to the pool for future use by others. A given
* lease may only belong to a single thread a time.
*/
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+ private final long maxWaitMillis;
+ private final Consumer<byte[], byte[]> kafkaConsumer;
+ private final ComponentLog logger;
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
+ private boolean poisoned = false;
+ //used for tracking demarcated flowfiles to their TopicPartition so we can append
+ //to them on subsequent poll calls
+ private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+ private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+ private long leaseStartNanos = -1;
+ private boolean lastPollEmpty = false;
+ private int totalFlowFiles = 0;
+
+ ConsumerLease(
+ final long maxWaitMillis,
+ final Consumer<byte[], byte[]> kafkaConsumer,
+ final byte[] demarcatorBytes,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger) {
+ this.maxWaitMillis = maxWaitMillis;
+ this.kafkaConsumer = kafkaConsumer;
+ this.demarcatorBytes = demarcatorBytes;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.logger = logger;
+ }
+
+ /**
+ * clears out internal state elements excluding session and consumer as
+ * those are managed by the pool itself
+ */
+ private void resetInternalState() {
+ bundleMap.clear();
+ uncommittedOffsetsMap.clear();
+ leaseStartNanos = -1;
+ lastPollEmpty = false;
+ totalFlowFiles = 0;
+ }
+
+ /**
+ * Kafka will call this method whenever it is about to rebalance the
+ * consumers for the given partitions. We'll simply take this to mean that
+ * we need to quickly commit what we've got and will return the consumer to
+ * the pool. This method will be called during the poll() method call of
+ * this class and will be called by the same thread calling poll according
+ * to the Kafka API docs. After this method executes the session and kafka
+ * offsets are committed and this lease is closed.
+ *
+ * @param partitions partitions being reassigned
+ */
+ @Override
+ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
+ commit();
+ }
/**
- * Executes a poll on the underlying Kafka Consumer.
+ * This will be called by Kafka when the rebalance has completed. We don't
+ * need to do anything with this information other than optionally log it as
+ * by this point we've committed what we've got and moved on.
*
- * @return ConsumerRecords retrieved in the poll.
- * @throws KafkaException if issue occurs talking to underlying resource.
+ * @param partitions topic partition set being reassigned
*/
- ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+ @Override
+ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ }
+
+ /**
+ * Executes a poll on the underlying Kafka Consumer and creates any new
+ * flowfiles necessary or appends to existing ones if in demarcation mode.
+ */
+ void poll() {
+ /**
+ * Implementation note: If we take too long (30 secs?) between kafka
+ * poll calls and our own record processing to any subsequent poll calls
+ * or the commit we can run into a situation where the commit will
+ * succeed to the session but fail on committing offsets. This is
+ * apparently different than the Kafka scenario of electing to rebalance
+ * for other reasons but in this case is due a session timeout. It
+ * appears Kafka KIP-62 aims to offer more control over the meaning of
+ * various timeouts. If we do run into this case it could result in
+ * duplicates.
+ */
+ try {
+ final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+ lastPollEmpty = records.count() == 0;
+ processRecords(records);
+ } catch (final Throwable t) {
+ this.poison();
+ throw t;
+ }
+ }
/**
* Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
* kafka client to collect more data from Kafka before committing the
* offsets.
*
- * @param offsets offsets
- * @throws KafkaException if issue occurs talking to underlying resource.
+ * if false then we didn't do anything and should probably yield if true
+ * then we committed new data
+ *
+ */
+ boolean commit() {
+ if (uncommittedOffsetsMap.isEmpty()) {
+ resetInternalState();
+ return false;
+ }
+ try {
+ /**
+ * Committing the nifi session then the offsets means we have an at
+ * least once guarantee here. If we reversed the order we'd have at
+ * most once.
+ */
+ final Collection<FlowFile> bundledFlowFiles = getBundles();
+ if (!bundledFlowFiles.isEmpty()) {
+ getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+ }
+ getProcessSession().commit();
+ kafkaConsumer.commitSync(uncommittedOffsetsMap);
+ resetInternalState();
+ return true;
+ } catch (final KafkaException kex) {
+ poison();
+ logger.warn("Duplicates are likely as we were able to commit the process"
+ + " session but received an exception from Kafka while committing"
+ + " offsets.");
+ throw kex;
+ } catch (final Throwable t) {
+ poison();
+ throw t;
+ }
+ }
+
+ /**
+ * Indicates whether we should continue polling for data. If we are not
+ * writing data with a demarcator then we're writing individual flow files
+ * per kafka message therefore we must be very mindful of memory usage for
+ * the flow file objects (not their content) being held in memory. The
+ * content of kafka messages will be written to the content repository
+ * immediately upon each poll call but we must still be mindful of how much
+ * memory can be used in each poll call. We will indicate that we should
+ * stop polling our last poll call produced no new results or if we've
+ * polling and processing data longer than the specified maximum polling
+ * time or if we have reached out specified max flow file limit or if a
+ * rebalance has been initiated for one of the partitions we're watching;
+ * otherwise true.
+ *
+ * @return true if should keep polling; false otherwise
+ */
+ boolean continuePolling() {
+ //stop if the last poll produced new no data
+ if (lastPollEmpty) {
+ return false;
+ }
+
+ //stop if we've gone past our desired max uncommitted wait time
+ if (leaseStartNanos < 0) {
+ leaseStartNanos = System.nanoTime();
+ }
+ final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ if (durationMillis > maxWaitMillis) {
+ return false;
+ }
+
+ //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+ if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+ return false;
+ } else {
+ return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+ }
+ }
+
+ /**
+ * Indicates that the underlying session and consumer should be immediately
+ * considered invalid. Once closed the session will be rolled back and the
+ * pool should destroy the underlying consumer. This is useful if due to
+ * external reasons, such as the processor no longer being scheduled, this
+ * lease should be terminated immediately.
*/
- void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+ private void poison() {
+ poisoned = true;
+ }
/**
- * Notifies that this lease is poisoned and should not be reused.
+ * @return true if this lease has been poisoned; false otherwise
*/
- void poison();
+ boolean isPoisoned() {
+ return poisoned;
+ }
/**
- * Notifies that this lease is to be returned. The pool may optionally reuse
- * this lease with another client. No further references by the caller
- * should occur after calling close.
+ * Abstract method that is intended to be extended by the pool that created
+ * this ConsumerLease object. It should ensure that the session given to
+ * create this session is rolled back and that the underlying kafka consumer
+ * is either returned to the pool for continued use or destroyed if this
+ * lease has been poisoned. It can only be called once. Calling it more than
+ * once can result in undefined and non threadsafe behavior.
*/
@Override
- void close();
+ public void close() {
+ resetInternalState();
+ }
+
+ public abstract ProcessSession getProcessSession();
+
+ private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+ records.partitions().stream().forEach(partition -> {
+ List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+ if (!messages.isEmpty()) {
+ //update maximum offset map for this topic partition
+ long maxOffset = messages.stream()
+ .mapToLong(record -> record.offset())
+ .max()
+ .getAsLong();
+ uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+ //write records to content repository and session
+ if (demarcatorBytes == null) {
+ totalFlowFiles += messages.size();
+ messages.stream().forEach(message -> {
+ writeData(getProcessSession(), message, partition);
+ });
+ } else {
+ writeData(getProcessSession(), messages, partition);
+ }
+ }
+ });
+ }
+
+ private static String encodeKafkaKey(final byte[] key, final String encoding) {
+ if (key == null) {
+ return null;
+ }
+
+ if (HEX_ENCODING.getValue().equals(encoding)) {
+ return DatatypeConverter.printHexBinary(key);
+ } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+ return new String(key, StandardCharsets.UTF_8);
+ } else {
+ return null; // won't happen because it is guaranteed by the Allowable Values
+ }
+ }
+
+ private Collection<FlowFile> getBundles() {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ for (final BundleTracker tracker : bundleMap.values()) {
+ populateAttributes(tracker);
+ flowFiles.add(tracker.flowFile);
+ }
+ return flowFiles;
+ }
+
+ private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+ FlowFile flowFile = session.create();
+ final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+ tracker.incrementRecordCount(1);
+ flowFile = session.write(flowFile, out -> {
+ out.write(record.value());
+ });
+ tracker.updateFlowFile(flowFile);
+ populateAttributes(tracker);
+ session.transfer(tracker.flowFile, REL_SUCCESS);
+ }
+
+ private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+ final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+ final boolean demarcateFirstRecord;
+ BundleTracker tracker = bundleMap.get(topicPartition);
+ FlowFile flowFile;
+ if (tracker == null) {
+ tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+ flowFile = session.create();
+ tracker.updateFlowFile(flowFile);
+ demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+ } else {
+ demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+ }
+ flowFile = tracker.flowFile;
+ tracker.incrementRecordCount(records.size());
+ flowFile = session.append(flowFile, out -> {
+ boolean useDemarcator = demarcateFirstRecord;
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ if (useDemarcator) {
+ out.write(demarcatorBytes);
+ }
+ out.write(record.value());
+ useDemarcator = true;
+ }
+ });
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(topicPartition, tracker);
+ }
+
+ private void populateAttributes(final BundleTracker tracker) {
+ final Map<String, String> kafkaAttrs = new HashMap<>();
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ if (tracker.key != null && tracker.totalRecords == 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+ }
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+ if (tracker.totalRecords > 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+ }
+ final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+ final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+ getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+ tracker.updateFlowFile(newFlowFile);
+ }
+
+ private static class BundleTracker {
+
+ final long initialOffset;
+ final int partition;
+ final String topic;
+ final String key;
+ FlowFile flowFile;
+ long totalRecords = 0;
+
+ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+ this.initialOffset = initialRecord.offset();
+ this.partition = topicPartition.partition();
+ this.topic = topicPartition.topic();
+ this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+ }
+
+ private void incrementRecordCount(final long count) {
+ totalRecords += count;
+ }
+
+ private void updateFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ }
+
+ }
}