You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2016/09/06 19:49:10 UTC
[1/3] nifi git commit: NIFI-2732 ensure session and consumer aligned
and has registered rebalance listener. Make consumption far more memory and
process efficient, fixed extraneous getbundled call
Repository: nifi
Updated Branches:
refs/heads/master 088125451 -> 7a451935a
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 3f20b8f..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
-import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessSession;
/**
* A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
*/
public class ConsumerPool implements Closeable {
- private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
- private final int maxLeases;
- private final Queue<ConsumerLease> consumerLeases;
+ private final BlockingQueue<SimpleConsumerLease> pooledLeases;
private final List<String> topics;
private final Map<String, Object> kafkaProperties;
+ private final long maxWaitMillis;
private final ComponentLog logger;
-
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
- private final AtomicLong productivePollCountRef = new AtomicLong();
- private final AtomicLong unproductivePollCountRef = new AtomicLong();
/**
* Creates a pool of KafkaConsumer objects that will grow up to the maximum
- * indicated leases. Consumers are lazily initialized.
+ * indicated threads from the given context. Consumers are lazily
+ * initialized. We may elect to not create up to the maximum number of
+ * configured consumers if the broker reported lag time for all topics is
+ * below a certain threshold.
*
- * @param maxLeases maximum number of active leases in the pool
- * @param topics the topics to consume from
- * @param kafkaProperties the properties for each consumer
+ * @param maxConcurrentLeases max allowable consumers at once
+ * @param demarcator bytes to use as demarcator between messages; null or
+ * empty means no demarcator
+ * @param kafkaProperties properties to use to initialize kafka consumers
+ * @param topics the topics to subscribe to
+ * @param maxWaitMillis maximum time to wait for a given lease to acquire
+ * data before committing
+ * @param keyEncoding the encoding to use for the key of a kafka message if
+ * found
+ * @param securityProtocol the security protocol used
+ * @param bootstrapServers the bootstrap servers
* @param logger the logger to report any errors/warnings
*/
- public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
- this.maxLeases = maxLeases;
- if (maxLeases <= 0) {
- throw new IllegalArgumentException("Max leases value must be greather than zero.");
- }
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final byte[] demarcator,
+ final Map<String, String> kafkaProperties,
+ final List<String> topics,
+ final long maxWaitMillis,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
- if (topics == null || topics.isEmpty()) {
- throw new IllegalArgumentException("Must have a list of one or more topics");
- }
- this.topics = topics;
- this.kafkaProperties = new HashMap<>(kafkaProperties);
- this.consumerLeases = new ArrayDeque<>();
+ this.demarcatorBytes = demarcator;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = Collections.unmodifiableList(topics);
}
/**
- * Obtains a consumer from the pool if one is available
+ * Obtains a consumer from the pool if one is available or lazily
+ * initializes a new one if deemed necessary.
*
- * @return consumer from the pool
- * @throws IllegalArgumentException if pool already contains
+ * @param session the session for which the consumer lease will be
+ * associated
+ * @return consumer to use or null if not available or necessary
*/
- public ConsumerLease obtainConsumer() {
- final ConsumerLease lease;
- final int activeLeases;
- synchronized (this) {
- lease = consumerLeases.poll();
- activeLeases = activeLeaseCount.get();
- }
- if (lease == null && activeLeases >= maxLeases) {
- logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
- return null;
+ public ConsumerLease obtainConsumer(final ProcessSession session) {
+ SimpleConsumerLease lease = pooledLeases.poll();
+ if (lease == null) {
+ final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+ consumerCreatedCountRef.incrementAndGet();
+ /**
+ * For now return a new consumer lease. But we could later elect to
+ * have this return null if we determine the broker indicates that
+ * the lag time on all topics being monitored is sufficiently low.
+ * For now we should encourage conservative use of threads because
+ * having too many means we'll have at best useless threads sitting
+ * around doing frequent network calls and at worst having consumers
+ * sitting idle which could prompt excessive rebalances.
+ */
+ lease = new SimpleConsumerLease(consumer);
+ /**
+ * This subscription tightly couples the lease to the given
+ * consumer. They cannot be separated from then on.
+ */
+ consumer.subscribe(topics, lease);
}
+ lease.setProcessSession(session);
leasesObtainedCountRef.incrementAndGet();
- return (lease == null) ? createConsumer() : lease;
+ return lease;
}
+ /**
+ * Exposed as protected method for easier unit testing
+ *
+ * @return consumer
+ * @throws KafkaException if unable to subscribe to the given topics
+ */
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return new KafkaConsumer<>(kafkaProperties);
}
- private ConsumerLease createConsumer() {
- final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
- consumerCreatedCountRef.incrementAndGet();
- try {
- kafkaConsumer.subscribe(topics);
- } catch (final KafkaException kex) {
- try {
- kafkaConsumer.close();
- consumerClosedCountRef.incrementAndGet();
- } catch (final Exception ex) {
- consumerClosedCountRef.incrementAndGet();
- //ignore
- }
- throw kex;
- }
-
- final ConsumerLease lease = new ConsumerLease() {
-
- private volatile boolean poisoned = false;
- private volatile boolean closed = false;
-
- @Override
- public ConsumerRecords<byte[], byte[]> poll() {
-
- if (poisoned) {
- throw new KafkaException("The consumer is poisoned and should no longer be used");
- }
-
- try {
- final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
- if (records.isEmpty()) {
- unproductivePollCountRef.incrementAndGet();
- } else {
- productivePollCountRef.incrementAndGet();
- }
- return records;
- } catch (final KafkaException kex) {
- logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
- poison();
- close();
- throw kex;
- }
- }
-
- @Override
- public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-
- if (poisoned) {
- throw new KafkaException("The consumer is poisoned and should no longer be used");
- }
- try {
- kafkaConsumer.commitSync(offsets);
- } catch (final KafkaException kex) {
- logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
- poison();
- close();
- throw kex;
- }
- }
-
- @Override
- public void close() {
- if (closed) {
- return;
- }
- if (poisoned || activeLeaseCount.get() > maxLeases) {
- closeConsumer(kafkaConsumer);
- activeLeaseCount.decrementAndGet();
- closed = true;
- } else {
- final boolean added;
- synchronized (ConsumerPool.this) {
- added = consumerLeases.offer(this);
- }
- if (!added) {
- closeConsumer(kafkaConsumer);
- activeLeaseCount.decrementAndGet();
- }
- }
- }
-
- @Override
- public void poison() {
- poisoned = true;
- }
- };
- activeLeaseCount.incrementAndGet();
- return lease;
- }
-
/**
- * Closes all consumers in the pool. Can be safely recalled.
+ * Closes all consumers in the pool. Can be safely called repeatedly.
*/
@Override
public void close() {
- final List<ConsumerLease> leases = new ArrayList<>();
- synchronized (this) {
- ConsumerLease lease = null;
- while ((lease = consumerLeases.poll()) != null) {
- leases.add(lease);
- }
- }
- for (final ConsumerLease lease : leases) {
- lease.poison();
- lease.close();
- }
+ final List<SimpleConsumerLease> leases = new ArrayList<>();
+ pooledLeases.drainTo(leases);
+ leases.stream().forEach((lease) -> {
+ lease.close(true);
+ });
}
private void closeConsumer(final Consumer consumer) {
+ consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
} catch (Exception e) {
@@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
try {
consumer.close();
- consumerClosedCountRef.incrementAndGet();
} catch (Exception e) {
- consumerClosedCountRef.incrementAndGet();
logger.warn("Failed while closing " + consumer, e);
}
}
PoolStats getPoolStats() {
- return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
+ return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+ }
+
+ private class SimpleConsumerLease extends ConsumerLease {
+
+ private final Consumer<byte[], byte[]> consumer;
+ private volatile ProcessSession session;
+ private volatile boolean closedConsumer;
+
+ private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+ super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
+ this.consumer = consumer;
+ }
+
+ void setProcessSession(final ProcessSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public ProcessSession getProcessSession() {
+ return session;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ close(false);
+ }
+
+ public void close(final boolean forceClose) {
+ if (closedConsumer) {
+ return;
+ }
+ super.close();
+ if (session != null) {
+ session.rollback();
+ setProcessSession(null);
+ }
+ if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+ closedConsumer = true;
+ closeConsumer(consumer);
+ }
+ }
}
static final class PoolStats {
@@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
final long consumerCreatedCount;
final long consumerClosedCount;
final long leasesObtainedCount;
- final long productivePollCount;
- final long unproductivePollCount;
PoolStats(
final long consumerCreatedCount,
final long consumerClosedCount,
- final long leasesObtainedCount,
- final long productivePollCount,
- final long unproductivePollCount
+ final long leasesObtainedCount
) {
this.consumerCreatedCount = consumerCreatedCount;
this.consumerClosedCount = consumerClosedCount;
this.leasesObtainedCount = leasesObtainedCount;
- this.productivePollCount = productivePollCount;
- this.unproductivePollCount = unproductivePollCount;
}
@Override
public String toString() {
return "Created Consumers [" + consumerCreatedCount + "]\n"
+ "Closed Consumers [" + consumerClosedCount + "]\n"
- + "Leases Obtained [" + leasesObtainedCount + "]\n"
- + "Productive Polls [" + productivePollCount + "]\n"
- + "Unproductive Polls [" + unproductivePollCount + "]\n";
+ + "Leases Obtained [" + leasesObtainedCount + "]\n";
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index c74ad18..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+ static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+ static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+ "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
static final String KAFKA_KEY = "kafka.key";
@@ -96,7 +100,6 @@ final class KafkaProcessorUtils {
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(false)
.build();
-
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
.displayName("SSL Context Service")
@@ -227,7 +230,6 @@ final class KafkaProcessorUtils {
mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
}
}
-
String pName = propertyDescriptor.getName();
String pValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7e4b12c..8e3fa3b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,105 +16,36 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
-
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class ConsumeKafkaTest {
- static class MockConsumerPool extends ConsumerPool {
-
- final int actualMaxLeases;
- final List<String> actualTopics;
- final Map<String, String> actualKafkaProperties;
- boolean throwKafkaExceptionOnPoll = false;
- boolean throwKafkaExceptionOnCommit = false;
- Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
- Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
- Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
- boolean wasConsumerLeasePoisoned = false;
- boolean wasConsumerLeaseClosed = false;
- boolean wasPoolClosed = false;
-
- public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
- super(maxLeases, topics, kafkaProperties, null);
- actualMaxLeases = maxLeases;
- actualTopics = topics;
- actualKafkaProperties = kafkaProperties;
- }
-
- @Override
- public ConsumerLease obtainConsumer() {
- return new ConsumerLease() {
- @Override
- public ConsumerRecords<byte[], byte[]> poll() {
- if (throwKafkaExceptionOnPoll) {
- throw new KafkaException("i planned to fail");
- }
- final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
- return (records == null) ? ConsumerRecords.empty() : records;
- }
-
- @Override
- public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
- if (throwKafkaExceptionOnCommit) {
- throw new KafkaException("i planned to fail");
- }
- actualCommitOffsets = offsets;
- }
-
- @Override
- public void poison() {
- wasConsumerLeasePoisoned = true;
- }
-
- @Override
- public void close() {
- wasConsumerLeaseClosed = true;
- }
- };
- }
-
- @Override
- public void close() {
- wasPoolClosed = true;
- }
-
- void resetState() {
- throwKafkaExceptionOnPoll = false;
- throwKafkaExceptionOnCommit = false;
- nextPlannedRecordsQueue = null;
- nextExpectedCommitOffsets = null;
- wasConsumerLeasePoisoned = false;
- wasConsumerLeaseClosed = false;
- wasPoolClosed = false;
- }
+ Consumer<byte[], byte[]> mockConsumer = null;
+ ConsumerLease mockLease = null;
+ ConsumerPool mockConsumerPool = null;
+ @Before
+ public void setup() {
+ mockConsumer = mock(Consumer.class);
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
}
@Test
@@ -175,365 +106,45 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- } else {
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- }
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- }
-
- @Test
- public void validateGetLotsOfMessages() throws Exception {
- String groupName = "validateGetLotsOfMessages";
-
- final byte[][] firstPassValues = new byte[10010][1];
- for (final byte[] value : firstPassValues) {
- value[0] = 0x12;
- }
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
- assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
- assertEquals(1, mockPool.actualCommitOffsets.size());
- assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
- for (final byte[] rawRecord : rawRecords) {
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
-
- for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
- final byte[] key = entry.getKey();
- final byte[] rawRecord = entry.getValue();
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- for (final ConsumerRecords<byte[], byte[]> rec : records) {
- rec.partitions().stream().forEach((part) -> {
- final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
- if (map.get(part) != null) {
- throw new IllegalStateException("already have that topic/partition in the record map");
- }
- map.put(part, conRecs);
- });
- }
- return new ConsumerRecords<>(map);
- }
-
- @Test
- public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
- String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues),
- createConsumerRecords("bar", 1, 1L, secondPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka proc = new ConsumeKafka() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
-
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(2, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- }
-
- @Test
- public void validatePollException() throws Exception {
- String groupName = "validatePollException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnPoll = true;
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(0, flowFiles.size());
- assertNull(null, mockPool.actualCommitOffsets);
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
-
- @Test
- public void validateCommitOffsetException() throws Exception {
- String groupName = "validateCommitOffsetException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnCommit = true;
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(1, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertNull(null, mockPool.actualCommitOffsets);
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
@Test
- public void validateUtf8Key() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
+ public void validateGetErrorMessages() throws Exception {
+ String groupName = "validateGetErrorMessages";
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(true, false);
+ when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka proc = new ConsumeKafka() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -542,89 +153,15 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(2)).continuePolling();
+ verify(mockLease, times(1)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
- @Test
- public void validateHexKey() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
-
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
- ConsumeKafka proc = new ConsumeKafka() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
+ ProcessSession mockSession = null;
+ ProvenanceReporter mockReporter = null;
+ ConsumerPool testPool = null;
+ ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
@Before
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
- }
-
- @Test
- public void validatePoolSimpleCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+ mockSession = mock(ProcessSession.class);
+ mockReporter = mock(ProvenanceReporter.class);
+ when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+ testPool = new ConsumerPool(
+ 1,
+ null,
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
+ testDemarcatedPool = new ConsumerPool(
+ 1,
+ "--demarcator--".getBytes(StandardCharsets.UTF_8),
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
+ @Override
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ return consumer;
+ }
+ };
+ }
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ @Test
+ public void validatePoolSimpleCreateClose() throws Exception {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
- lease.commitOffsets(Collections.emptyMap());
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
- assertEquals(1, stats.leasesObtainedCount);
- assertEquals(1, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
+ assertEquals(4, stats.leasesObtainedCount);
}
@Test
- public void validatePoolSimpleBatchCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolSimpleCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ lease.commit();
+ }
+ testPool.close();
+ verify(mockSession, times(3)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+ @Test
+ public void validatePoolSimpleBatchCreateClose() throws Exception {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
- lease.commitOffsets(Collections.emptyMap());
}
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(100, stats.leasesObtainedCount);
- assertEquals(10000, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
@Test
- public void validatePoolConsumerFails() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolBatchCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
lease.poll();
- fail();
- } catch (final KafkaException ke) {
+ lease.commit();
+ }
+ testDemarcatedPool.close();
+ verify(mockSession, times(1)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testDemarcatedPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @Test
+ public void validatePoolConsumerFails() throws Exception {
+
+ when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ try {
+ lease.poll();
+ fail();
+ } catch (final KafkaException ke) {
+ }
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
- assertEquals(0, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+ final TopicPartition tPart = new TopicPartition(topic, partition);
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ long offset = startingOffset;
+ for (final byte[] rawRecord : rawRecords) {
+ final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+ records.add(rec);
+ }
+ map.put(tPart, records);
+ return new ConsumerRecords(map);
+ }
+
}
[3/3] nifi git commit: NIFI-2732 ensure session and consumer aligned
and has registered rebalance listener. Make consumption far more memory and
process efficient, fixed extraneous getbundled call
Posted by bb...@apache.org.
NIFI-2732 ensure session and consumer aligned and has registered rebalance listener. Make consumption far more memory and process efficient, fixed extraneous getbundled call
This closes #987.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7a451935
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7a451935
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7a451935
Branch: refs/heads/master
Commit: 7a451935a583d4d7fd4eccf47a869552c2f8dda7
Parents: 0881254
Author: joewitt <jo...@apache.org>
Authored: Wed Aug 31 15:25:12 2016 +1000
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Sep 6 15:48:40 2016 -0400
----------------------------------------------------------------------
.../kafka/pubsub/ConsumeKafka_0_10.java | 265 ++-------
.../processors/kafka/pubsub/ConsumerLease.java | 359 +++++++++++-
.../processors/kafka/pubsub/ConsumerPool.java | 285 +++++-----
.../kafka/pubsub/KafkaProcessorUtils.java | 4 +
.../kafka/pubsub/PublishKafka_0_10.java | 6 +-
.../kafka/pubsub/ConsumeKafkaTest.java | 548 ++----------------
.../kafka/pubsub/ConsumerPoolTest.java | 172 ++++--
.../kafka/pubsub/PublishKafkaTest.java | 2 +-
.../processors/kafka/pubsub/ConsumeKafka.java | 268 ++-------
.../processors/kafka/pubsub/ConsumerLease.java | 359 +++++++++++-
.../processors/kafka/pubsub/ConsumerPool.java | 285 +++++-----
.../kafka/pubsub/KafkaProcessorUtils.java | 6 +-
.../kafka/pubsub/ConsumeKafkaTest.java | 555 ++-----------------
.../kafka/pubsub/ConsumerPoolTest.java | 172 ++++--
14 files changed, 1413 insertions(+), 1873 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index 847f8a4..e799876 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -25,13 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -53,7 +46,8 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10 Consumer API. "
+ " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
@@ -63,7 +57,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
- + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+ + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
public class ConsumeKafka_0_10 extends AbstractProcessor {
- private static final long FIVE_MB = 5L * 1024L * 1024L;
-
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
- static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
- static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
- "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
@@ -136,6 +124,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
+
static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
@@ -145,6 +134,20 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+ .name("max-uncommit-offset-wait")
+ .displayName("Max Uncommitted Time")
+ .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ + "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ + "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ + "than when we're not as there is much less for us to keep track of in memory.")
+ .required(false)
+ .defaultValue("1 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,7 +156,6 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
- private volatile byte[] demarcatorBytes = null;
private volatile ConsumerPool consumerPool = null;
static {
@@ -165,6 +167,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS);
+ descriptors.add(MAX_UNCOMMITTED_TIME);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
}
@@ -179,16 +182,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
return DESCRIPTORS;
}
- @OnScheduled
- public void prepareProcessing(final ProcessContext context) {
- this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
- ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
- : null;
- }
-
@OnStopped
public void close() {
- demarcatorBytes = null;
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
@@ -215,9 +210,21 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
return pool;
}
+ return consumerPool = createConsumerPool(context, getLogger());
+ }
+
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ final int maxLeases = context.getMaxConcurrentTasks();
+ final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+ final byte[] demarcator = context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).isSet()
+ ? context.getProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+ : null;
final Map<String, String> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
- final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -225,212 +232,40 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
topics.add(trimmedName);
}
}
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
- }
+ final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+ final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return new ConsumerPool(maxLeases, topics, props, log);
+ return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- final long startTimeNanos = System.nanoTime();
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
- try (final ConsumerLease lease = pool.obtainConsumer()) {
- try {
- if (lease == null) {
- context.yield();
- return;
- }
- final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
- if (!foundData) {
- session.rollback();
- return;
- }
-
- writeSessionData(context, session, partitionRecordMap, startTimeNanos);
- //At-least once commit handling (if order is reversed it is at-most once)
- session.commit();
- commitOffsets(lease, partitionRecordMap);
- } catch (final KafkaException ke) {
- lease.poison();
- getLogger().error("Problem while accessing kafka consumer " + ke, ke);
+ try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+ if (lease == null) {
context.yield();
- session.rollback();
+ return;
}
- }
- }
-
- private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
- final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .forEach((entry) -> {
- long maxOffset = entry.getValue().stream()
- .mapToLong(record -> record.offset())
- .max()
- .getAsLong();
- partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
- });
- lease.commitOffsets(partOffsetMap);
- }
-
- private void writeSessionData(
- final ProcessContext context, final ProcessSession session,
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
- final long startTimeNanos) {
- if (demarcatorBytes != null) {
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .forEach(entry -> {
- writeData(context, session, entry.getValue(), startTimeNanos);
- });
- } else {
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .flatMap(entry -> entry.getValue().stream())
- .forEach(record -> {
- writeData(context, session, Collections.singletonList(record), startTimeNanos);
- });
- }
- }
-
- private String encodeKafkaKey(final byte[] key, final String encoding) {
- if (key == null) {
- return null;
- }
-
- if (HEX_ENCODING.getValue().equals(encoding)) {
- return DatatypeConverter.printHexBinary(key);
- } else if (UTF8_ENCODING.getValue().equals(encoding)) {
- return new String(key, StandardCharsets.UTF_8);
- } else {
- return null; // won't happen because it is guaranteed by the Allowable Values
- }
- }
-
- private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
- final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
- final String offset = String.valueOf(firstRecord.offset());
- final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
- final String topic = firstRecord.topic();
- final String partition = String.valueOf(firstRecord.partition());
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, out -> {
- boolean useDemarcator = false;
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- if (useDemarcator) {
- out.write(demarcatorBytes);
- }
- out.write(record.value());
- useDemarcator = true;
- }
- });
- final Map<String, String> kafkaAttrs = new HashMap<>();
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
- if (keyValue != null && records.size() == 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
- }
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
- if (records.size() > 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
- }
- flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
- final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
- final String transitUri = KafkaProcessorUtils.buildTransitURI(
- context.getProperty(SECURITY_PROTOCOL).getValue(),
- context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
- topic);
- session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
- this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
- new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
- session.transfer(flowFile, REL_SUCCESS);
- }
-
- /**
- * Populates the given partitionRecordMap with new records until we poll
- * that returns no records or until we have enough data. It is important to
- * ensure we keep items grouped by their topic and partition so that when we
- * bundle them we bundle them intelligently and so that we can set offsets
- * properly even across multiple poll calls.
- */
- private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
- final long startNanos = System.nanoTime();
- boolean foundData = false;
- ConsumerRecords<byte[], byte[]> records;
- final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
- do {
- records = lease.poll();
-
- for (final TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
- if (currList == null) {
- currList = new ArrayList<>();
- partitionRecordMap.put(partition, currList);
+ try {
+ while (this.isScheduled() && lease.continuePolling()) {
+ lease.poll();
}
- currList.addAll(records.records(partition));
- if (currList.size() > 0) {
- foundData = true;
+ if (this.isScheduled() && !lease.commit()) {
+ context.yield();
}
- }
- //If we received data and we still want to get more
- } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
- return foundData;
- }
-
- /**
- * Determines if we have enough data as-is and should move on.
- *
- * @return true if we've been gathering for more than 500 ms or if we're
- * demarcating and have more than 50 flowfiles worth or if we're per message
- * and have more than 2000 flowfiles or if totalMessageSize is greater than
- * two megabytes; false otherwise
- *
- * Implementation note: 500 millis and 5 MB are magic numbers. These may
- * need to be tuned. They get at how often offsets will get committed to
- * kafka relative to how many records will get buffered into memory in a
- * poll call before writing to repos.
- */
- private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final int maxRecords, final long startTimeNanos) {
-
- final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
- if (durationMillis > 500) {
- return true;
- }
-
- int topicPartitionsFilled = 0;
- int totalRecords = 0;
- long totalRecordSize = 0;
-
- for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
- if (!recordList.isEmpty()) {
- topicPartitionsFilled++;
- }
- totalRecords += recordList.size();
- for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
- totalRecordSize += rec.value().length;
+ } catch (final KafkaException kex) {
+ getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+ new Object[]{lease, kex}, kex);
+ } catch (final Throwable t) {
+ getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+ new Object[]{lease, t}, t);
}
}
-
- if (demarcatorBytes != null && demarcatorBytes.length > 0) {
- return topicPartitionsFilled > 50;
- } else if (totalRecordSize > FIVE_MB) {
- return true;
- } else {
- return totalRecords > maxRecords;
- }
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..f7a2e57 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,27 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
* the lease will be returned to the pool for future use by others. A given
* lease may only belong to a single thread a time.
*/
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+ private final long maxWaitMillis;
+ private final Consumer<byte[], byte[]> kafkaConsumer;
+ private final ComponentLog logger;
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
+ private boolean poisoned = false;
+ //used for tracking demarcated flowfiles to their TopicPartition so we can append
+ //to them on subsequent poll calls
+ private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+ private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+ private long leaseStartNanos = -1;
+ private boolean lastPollEmpty = false;
+ private int totalFlowFiles = 0;
+
+ ConsumerLease(
+ final long maxWaitMillis,
+ final Consumer<byte[], byte[]> kafkaConsumer,
+ final byte[] demarcatorBytes,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger) {
+ this.maxWaitMillis = maxWaitMillis;
+ this.kafkaConsumer = kafkaConsumer;
+ this.demarcatorBytes = demarcatorBytes;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.logger = logger;
+ }
+
+ /**
+ * clears out internal state elements excluding session and consumer as
+ * those are managed by the pool itself
+ */
+ private void resetInternalState() {
+ bundleMap.clear();
+ uncommittedOffsetsMap.clear();
+ leaseStartNanos = -1;
+ lastPollEmpty = false;
+ totalFlowFiles = 0;
+ }
+
+ /**
+ * Kafka will call this method whenever it is about to rebalance the
+ * consumers for the given partitions. We'll simply take this to mean that
+ * we need to quickly commit what we've got and will return the consumer to
+ * the pool. This method will be called during the poll() method call of
+ * this class and will be called by the same thread calling poll according
+ * to the Kafka API docs. After this method executes the session and kafka
+ * offsets are committed and this lease is closed.
+ *
+ * @param partitions partitions being reassigned
+ */
+ @Override
+ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
+ commit();
+ }
/**
- * Executes a poll on the underlying Kafka Consumer.
+ * This will be called by Kafka when the rebalance has completed. We don't
+ * need to do anything with this information other than optionally log it as
+ * by this point we've committed what we've got and moved on.
*
- * @return ConsumerRecords retrieved in the poll.
- * @throws KafkaException if issue occurs talking to underlying resource.
+ * @param partitions topic partition set being reassigned
*/
- ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+ @Override
+ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ }
+
+ /**
+ * Executes a poll on the underlying Kafka Consumer and creates any new
+ * flowfiles necessary or appends to existing ones if in demarcation mode.
+ */
+ void poll() {
+ /**
+ * Implementation note: If we take too long (30 secs?) between kafka
+ * poll calls and our own record processing to any subsequent poll calls
+ * or the commit we can run into a situation where the commit will
+ * succeed to the session but fail on committing offsets. This is
+ * apparently different than the Kafka scenario of electing to rebalance
+ * for other reasons but in this case is due a session timeout. It
+ * appears Kafka KIP-62 aims to offer more control over the meaning of
+ * various timeouts. If we do run into this case it could result in
+ * duplicates.
+ */
+ try {
+ final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+ lastPollEmpty = records.count() == 0;
+ processRecords(records);
+ } catch (final Throwable t) {
+ this.poison();
+ throw t;
+ }
+ }
/**
* Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
* kafka client to collect more data from Kafka before committing the
* offsets.
*
- * @param offsets offsets
- * @throws KafkaException if issue occurs talking to underlying resource.
+ * if false then we didn't do anything and should probably yield if true
+ * then we committed new data
+ *
+ */
+ boolean commit() {
+ if (uncommittedOffsetsMap.isEmpty()) {
+ resetInternalState();
+ return false;
+ }
+ try {
+ /**
+ * Committing the nifi session then the offsets means we have an at
+ * least once guarantee here. If we reversed the order we'd have at
+ * most once.
+ */
+ final Collection<FlowFile> bundledFlowFiles = getBundles();
+ if (!bundledFlowFiles.isEmpty()) {
+ getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+ }
+ getProcessSession().commit();
+ kafkaConsumer.commitSync(uncommittedOffsetsMap);
+ resetInternalState();
+ return true;
+ } catch (final KafkaException kex) {
+ poison();
+ logger.warn("Duplicates are likely as we were able to commit the process"
+ + " session but received an exception from Kafka while committing"
+ + " offsets.");
+ throw kex;
+ } catch (final Throwable t) {
+ poison();
+ throw t;
+ }
+ }
+
+ /**
+ * Indicates whether we should continue polling for data. If we are not
+ * writing data with a demarcator then we're writing individual flow files
+ * per kafka message therefore we must be very mindful of memory usage for
+ * the flow file objects (not their content) being held in memory. The
+ * content of kafka messages will be written to the content repository
+ * immediately upon each poll call but we must still be mindful of how much
+ * memory can be used in each poll call. We will indicate that we should
+ * stop polling our last poll call produced no new results or if we've
+ * polling and processing data longer than the specified maximum polling
+ * time or if we have reached out specified max flow file limit or if a
+ * rebalance has been initiated for one of the partitions we're watching;
+ * otherwise true.
+ *
+ * @return true if should keep polling; false otherwise
+ */
+ boolean continuePolling() {
+ //stop if the last poll produced new no data
+ if (lastPollEmpty) {
+ return false;
+ }
+
+ //stop if we've gone past our desired max uncommitted wait time
+ if (leaseStartNanos < 0) {
+ leaseStartNanos = System.nanoTime();
+ }
+ final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ if (durationMillis > maxWaitMillis) {
+ return false;
+ }
+
+ //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+ if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+ return false;
+ } else {
+ return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+ }
+ }
+
+ /**
+ * Indicates that the underlying session and consumer should be immediately
+ * considered invalid. Once closed the session will be rolled back and the
+ * pool should destroy the underlying consumer. This is useful if due to
+ * external reasons, such as the processor no longer being scheduled, this
+ * lease should be terminated immediately.
*/
- void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+ private void poison() {
+ poisoned = true;
+ }
/**
- * Notifies that this lease is poisoned and should not be reused.
+ * @return true if this lease has been poisoned; false otherwise
*/
- void poison();
+ boolean isPoisoned() {
+ return poisoned;
+ }
/**
- * Notifies that this lease is to be returned. The pool may optionally reuse
- * this lease with another client. No further references by the caller
- * should occur after calling close.
+ * Abstract method that is intended to be extended by the pool that created
+ * this ConsumerLease object. It should ensure that the session given to
+ * create this session is rolled back and that the underlying kafka consumer
+ * is either returned to the pool for continued use or destroyed if this
+ * lease has been poisoned. It can only be called once. Calling it more than
+ * once can result in undefined and non threadsafe behavior.
*/
@Override
- void close();
+ public void close() {
+ resetInternalState();
+ }
+
+ public abstract ProcessSession getProcessSession();
+
+ private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+ records.partitions().stream().forEach(partition -> {
+ List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+ if (!messages.isEmpty()) {
+ //update maximum offset map for this topic partition
+ long maxOffset = messages.stream()
+ .mapToLong(record -> record.offset())
+ .max()
+ .getAsLong();
+ uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+ //write records to content repository and session
+ if (demarcatorBytes == null) {
+ totalFlowFiles += messages.size();
+ messages.stream().forEach(message -> {
+ writeData(getProcessSession(), message, partition);
+ });
+ } else {
+ writeData(getProcessSession(), messages, partition);
+ }
+ }
+ });
+ }
+
+ private static String encodeKafkaKey(final byte[] key, final String encoding) {
+ if (key == null) {
+ return null;
+ }
+
+ if (HEX_ENCODING.getValue().equals(encoding)) {
+ return DatatypeConverter.printHexBinary(key);
+ } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+ return new String(key, StandardCharsets.UTF_8);
+ } else {
+ return null; // won't happen because it is guaranteed by the Allowable Values
+ }
+ }
+
+ private Collection<FlowFile> getBundles() {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ for (final BundleTracker tracker : bundleMap.values()) {
+ populateAttributes(tracker);
+ flowFiles.add(tracker.flowFile);
+ }
+ return flowFiles;
+ }
+
+ private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+ FlowFile flowFile = session.create();
+ final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+ tracker.incrementRecordCount(1);
+ flowFile = session.write(flowFile, out -> {
+ out.write(record.value());
+ });
+ tracker.updateFlowFile(flowFile);
+ populateAttributes(tracker);
+ session.transfer(tracker.flowFile, REL_SUCCESS);
+ }
+
+ private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+ final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+ final boolean demarcateFirstRecord;
+ BundleTracker tracker = bundleMap.get(topicPartition);
+ FlowFile flowFile;
+ if (tracker == null) {
+ tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+ flowFile = session.create();
+ tracker.updateFlowFile(flowFile);
+ demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+ } else {
+ demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+ }
+ flowFile = tracker.flowFile;
+ tracker.incrementRecordCount(records.size());
+ flowFile = session.append(flowFile, out -> {
+ boolean useDemarcator = demarcateFirstRecord;
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ if (useDemarcator) {
+ out.write(demarcatorBytes);
+ }
+ out.write(record.value());
+ useDemarcator = true;
+ }
+ });
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(topicPartition, tracker);
+ }
+
+ private void populateAttributes(final BundleTracker tracker) {
+ final Map<String, String> kafkaAttrs = new HashMap<>();
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ if (tracker.key != null && tracker.totalRecords == 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+ }
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+ if (tracker.totalRecords > 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+ }
+ final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+ final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+ getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+ tracker.updateFlowFile(newFlowFile);
+ }
+
+ private static class BundleTracker {
+
+ final long initialOffset;
+ final int partition;
+ final String topic;
+ final String key;
+ FlowFile flowFile;
+ long totalRecords = 0;
+
+ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+ this.initialOffset = initialRecord.offset();
+ this.partition = topicPartition.partition();
+ this.topic = topicPartition.topic();
+ this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+ }
+
+ private void incrementRecordCount(final long count) {
+ totalRecords += count;
+ }
+
+ private void updateFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 3f20b8f..fba8cb5 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -21,18 +21,15 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.nifi.logging.ComponentLog;
import java.io.Closeable;
-import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessSession;
/**
* A pool of Kafka Consumers for a given topic. Consumers can be obtained by
@@ -41,176 +38,118 @@ import org.apache.kafka.common.TopicPartition;
*/
public class ConsumerPool implements Closeable {
- private final AtomicInteger activeLeaseCount = new AtomicInteger(0);
- private final int maxLeases;
- private final Queue<ConsumerLease> consumerLeases;
+ private final BlockingQueue<SimpleConsumerLease> pooledLeases;
private final List<String> topics;
private final Map<String, Object> kafkaProperties;
+ private final long maxWaitMillis;
private final ComponentLog logger;
-
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
- private final AtomicLong productivePollCountRef = new AtomicLong();
- private final AtomicLong unproductivePollCountRef = new AtomicLong();
/**
* Creates a pool of KafkaConsumer objects that will grow up to the maximum
- * indicated leases. Consumers are lazily initialized.
+ * indicated threads from the given context. Consumers are lazily
+ * initialized. We may elect to not create up to the maximum number of
+ * configured consumers if the broker reported lag time for all topics is
+ * below a certain threshold.
*
- * @param maxLeases maximum number of active leases in the pool
- * @param topics the topics to consume from
- * @param kafkaProperties the properties for each consumer
+ * @param maxConcurrentLeases max allowable consumers at once
+ * @param demarcator bytes to use as demarcator between messages; null or
+ * empty means no demarcator
+ * @param kafkaProperties properties to use to initialize kafka consumers
+ * @param topics the topics to subscribe to
+ * @param maxWaitMillis maximum time to wait for a given lease to acquire
+ * data before committing
+ * @param keyEncoding the encoding to use for the key of a kafka message if
+ * found
+ * @param securityProtocol the security protocol used
+ * @param bootstrapServers the bootstrap servers
* @param logger the logger to report any errors/warnings
*/
- public ConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> kafkaProperties, final ComponentLog logger) {
- this.maxLeases = maxLeases;
- if (maxLeases <= 0) {
- throw new IllegalArgumentException("Max leases value must be greather than zero.");
- }
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final byte[] demarcator,
+ final Map<String, String> kafkaProperties,
+ final List<String> topics,
+ final long maxWaitMillis,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
- if (topics == null || topics.isEmpty()) {
- throw new IllegalArgumentException("Must have a list of one or more topics");
- }
- this.topics = topics;
- this.kafkaProperties = new HashMap<>(kafkaProperties);
- this.consumerLeases = new ArrayDeque<>();
+ this.demarcatorBytes = demarcator;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = Collections.unmodifiableList(topics);
}
/**
- * Obtains a consumer from the pool if one is available
+ * Obtains a consumer from the pool if one is available or lazily
+ * initializes a new one if deemed necessary.
*
- * @return consumer from the pool
- * @throws IllegalArgumentException if pool already contains
+ * @param session the session for which the consumer lease will be
+ * associated
+ * @return consumer to use or null if not available or necessary
*/
- public ConsumerLease obtainConsumer() {
- final ConsumerLease lease;
- final int activeLeases;
- synchronized (this) {
- lease = consumerLeases.poll();
- activeLeases = activeLeaseCount.get();
- }
- if (lease == null && activeLeases >= maxLeases) {
- logger.warn("No available consumers and cannot create any as max consumer leases limit reached - verify pool settings");
- return null;
+ public ConsumerLease obtainConsumer(final ProcessSession session) {
+ SimpleConsumerLease lease = pooledLeases.poll();
+ if (lease == null) {
+ final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+ consumerCreatedCountRef.incrementAndGet();
+ /**
+ * For now return a new consumer lease. But we could later elect to
+ * have this return null if we determine the broker indicates that
+ * the lag time on all topics being monitored is sufficiently low.
+ * For now we should encourage conservative use of threads because
+ * having too many means we'll have at best useless threads sitting
+ * around doing frequent network calls and at worst having consumers
+ * sitting idle which could prompt excessive rebalances.
+ */
+ lease = new SimpleConsumerLease(consumer);
+ /**
+ * This subscription tightly couples the lease to the given
+ * consumer. They cannot be separated from then on.
+ */
+ consumer.subscribe(topics, lease);
}
+ lease.setProcessSession(session);
leasesObtainedCountRef.incrementAndGet();
- return (lease == null) ? createConsumer() : lease;
+ return lease;
}
+ /**
+ * Exposed as protected method for easier unit testing
+ *
+ * @return consumer
+ * @throws KafkaException if unable to subscribe to the given topics
+ */
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return new KafkaConsumer<>(kafkaProperties);
}
- private ConsumerLease createConsumer() {
- final Consumer<byte[], byte[]> kafkaConsumer = createKafkaConsumer();
- consumerCreatedCountRef.incrementAndGet();
- try {
- kafkaConsumer.subscribe(topics);
- } catch (final KafkaException kex) {
- try {
- kafkaConsumer.close();
- consumerClosedCountRef.incrementAndGet();
- } catch (final Exception ex) {
- consumerClosedCountRef.incrementAndGet();
- //ignore
- }
- throw kex;
- }
-
- final ConsumerLease lease = new ConsumerLease() {
-
- private volatile boolean poisoned = false;
- private volatile boolean closed = false;
-
- @Override
- public ConsumerRecords<byte[], byte[]> poll() {
-
- if (poisoned) {
- throw new KafkaException("The consumer is poisoned and should no longer be used");
- }
-
- try {
- final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(50);
- if (records.isEmpty()) {
- unproductivePollCountRef.incrementAndGet();
- } else {
- productivePollCountRef.incrementAndGet();
- }
- return records;
- } catch (final KafkaException kex) {
- logger.warn("Unable to poll from Kafka consumer so will poison and close this " + kafkaConsumer, kex);
- poison();
- close();
- throw kex;
- }
- }
-
- @Override
- public void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-
- if (poisoned) {
- throw new KafkaException("The consumer is poisoned and should no longer be used");
- }
- try {
- kafkaConsumer.commitSync(offsets);
- } catch (final KafkaException kex) {
- logger.warn("Unable to commit kafka consumer offsets so will poison and close this " + kafkaConsumer, kex);
- poison();
- close();
- throw kex;
- }
- }
-
- @Override
- public void close() {
- if (closed) {
- return;
- }
- if (poisoned || activeLeaseCount.get() > maxLeases) {
- closeConsumer(kafkaConsumer);
- activeLeaseCount.decrementAndGet();
- closed = true;
- } else {
- final boolean added;
- synchronized (ConsumerPool.this) {
- added = consumerLeases.offer(this);
- }
- if (!added) {
- closeConsumer(kafkaConsumer);
- activeLeaseCount.decrementAndGet();
- }
- }
- }
-
- @Override
- public void poison() {
- poisoned = true;
- }
- };
- activeLeaseCount.incrementAndGet();
- return lease;
- }
-
/**
- * Closes all consumers in the pool. Can be safely recalled.
+ * Closes all consumers in the pool. Can be safely called repeatedly.
*/
@Override
public void close() {
- final List<ConsumerLease> leases = new ArrayList<>();
- synchronized (this) {
- ConsumerLease lease = null;
- while ((lease = consumerLeases.poll()) != null) {
- leases.add(lease);
- }
- }
- for (final ConsumerLease lease : leases) {
- lease.poison();
- lease.close();
- }
+ final List<SimpleConsumerLease> leases = new ArrayList<>();
+ pooledLeases.drainTo(leases);
+ leases.stream().forEach((lease) -> {
+ lease.close(true);
+ });
}
private void closeConsumer(final Consumer consumer) {
+ consumerClosedCountRef.incrementAndGet();
try {
consumer.unsubscribe();
} catch (Exception e) {
@@ -219,15 +158,55 @@ public class ConsumerPool implements Closeable {
try {
consumer.close();
- consumerClosedCountRef.incrementAndGet();
} catch (Exception e) {
- consumerClosedCountRef.incrementAndGet();
logger.warn("Failed while closing " + consumer, e);
}
}
PoolStats getPoolStats() {
- return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get(), productivePollCountRef.get(), unproductivePollCountRef.get());
+ return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+ }
+
+ private class SimpleConsumerLease extends ConsumerLease {
+
+ private final Consumer<byte[], byte[]> consumer;
+ private volatile ProcessSession session;
+ private volatile boolean closedConsumer;
+
+ private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+ super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, logger);
+ this.consumer = consumer;
+ }
+
+ void setProcessSession(final ProcessSession session) {
+ this.session = session;
+ }
+
+ @Override
+ public ProcessSession getProcessSession() {
+ return session;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ close(false);
+ }
+
+ public void close(final boolean forceClose) {
+ if (closedConsumer) {
+ return;
+ }
+ super.close();
+ if (session != null) {
+ session.rollback();
+ setProcessSession(null);
+ }
+ if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+ closedConsumer = true;
+ closeConsumer(consumer);
+ }
+ }
}
static final class PoolStats {
@@ -235,30 +214,22 @@ public class ConsumerPool implements Closeable {
final long consumerCreatedCount;
final long consumerClosedCount;
final long leasesObtainedCount;
- final long productivePollCount;
- final long unproductivePollCount;
PoolStats(
final long consumerCreatedCount,
final long consumerClosedCount,
- final long leasesObtainedCount,
- final long productivePollCount,
- final long unproductivePollCount
+ final long leasesObtainedCount
) {
this.consumerCreatedCount = consumerCreatedCount;
this.consumerClosedCount = consumerClosedCount;
this.leasesObtainedCount = leasesObtainedCount;
- this.productivePollCount = productivePollCount;
- this.unproductivePollCount = unproductivePollCount;
}
@Override
public String toString() {
return "Created Consumers [" + consumerCreatedCount + "]\n"
+ "Closed Consumers [" + consumerClosedCount + "]\n"
- + "Leases Obtained [" + leasesObtainedCount + "]\n"
- + "Productive Polls [" + productivePollCount + "]\n"
- + "Unproductive Polls [" + unproductivePollCount + "]\n";
+ + "Leases Obtained [" + leasesObtainedCount + "]\n";
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 3ae7495..707a431 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -55,6 +55,10 @@ final class KafkaProcessorUtils {
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+ static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+ static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+ "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
static final String KAFKA_KEY = "kafka.key";
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
index 18f3018..83688b7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -55,6 +55,8 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,10 +106,6 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions.");
- static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
- static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
- "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name")
[2/3] nifi git commit: NIFI-2732 ensure session and consumer aligned
and has registered rebalance listener. Make consumption far more memory and
process efficient, fixed extraneous getbundled call
Posted by bb...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index a85563d..6fd9053 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -16,104 +16,36 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.UUID;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
public class ConsumeKafkaTest {
- static class MockConsumerPool extends ConsumerPool {
-
- final int actualMaxLeases;
- final List<String> actualTopics;
- final Map<String, String> actualKafkaProperties;
- boolean throwKafkaExceptionOnPoll = false;
- boolean throwKafkaExceptionOnCommit = false;
- Queue<ConsumerRecords<byte[], byte[]>> nextPlannedRecordsQueue = new ArrayDeque<>();
- Map<TopicPartition, OffsetAndMetadata> nextExpectedCommitOffsets = null;
- Map<TopicPartition, OffsetAndMetadata> actualCommitOffsets = null;
- boolean wasConsumerLeasePoisoned = false;
- boolean wasConsumerLeaseClosed = false;
- boolean wasPoolClosed = false;
-
- public MockConsumerPool(int maxLeases, List<String> topics, Map<String, String> kafkaProperties, ComponentLog logger) {
- super(maxLeases, topics, kafkaProperties, null);
- actualMaxLeases = maxLeases;
- actualTopics = topics;
- actualKafkaProperties = kafkaProperties;
- }
-
- @Override
- public ConsumerLease obtainConsumer() {
- return new ConsumerLease() {
- @Override
- public ConsumerRecords<byte[], byte[]> poll() {
- if (throwKafkaExceptionOnPoll) {
- throw new KafkaException("i planned to fail");
- }
- final ConsumerRecords<byte[], byte[]> records = nextPlannedRecordsQueue.poll();
- return (records == null) ? ConsumerRecords.empty() : records;
- }
-
- @Override
- public void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
- if (throwKafkaExceptionOnCommit) {
- throw new KafkaException("i planned to fail");
- }
- actualCommitOffsets = offsets;
- }
-
- @Override
- public void poison() {
- wasConsumerLeasePoisoned = true;
- }
-
- @Override
- public void close() {
- wasConsumerLeaseClosed = true;
- }
- };
- }
-
- @Override
- public void close() {
- wasPoolClosed = true;
- }
-
- void resetState() {
- throwKafkaExceptionOnPoll = false;
- throwKafkaExceptionOnCommit = false;
- nextPlannedRecordsQueue = null;
- nextExpectedCommitOffsets = null;
- wasConsumerLeasePoisoned = false;
- wasConsumerLeaseClosed = false;
- wasPoolClosed = false;
- }
+ Consumer<byte[], byte[]> mockConsumer = null;
+ ConsumerLease mockLease = null;
+ ConsumerPool mockConsumerPool = null;
+ @Before
+ public void setup() {
+ mockConsumer = mock(Consumer.class);
+ mockLease = mock(ConsumerLease.class);
+ mockConsumerPool = mock(ConsumerPool.class);
}
@Test
@@ -174,31 +106,14 @@ public class ConsumeKafkaTest {
public void validateGetAllMessages() throws Exception {
String groupName = "validateGetAllMessages";
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -207,69 +122,29 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- if (mockPool.nextPlannedRecordsQueue.isEmpty()) {
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-5")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-6")).count());
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- } else {
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- }
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
@Test
- public void validateGetLotsOfMessages() throws Exception {
- String groupName = "validateGetLotsOfMessages";
-
- final byte[][] firstPassValues = new byte[10010][1];
- for (final byte[] value : firstPassValues) {
- value[0] = 0x12;
- }
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
+ public void validateGetErrorMessages() throws Exception {
+ String groupName = "validateGetErrorMessages";
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> secondRecs = createConsumerRecords("bar", 1, 1L, secondPassValues);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
- mockPool.nextPlannedRecordsQueue.add(secondRecs);
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(true, false);
+ when(mockLease.commit()).thenReturn(Boolean.FALSE);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
@@ -278,352 +153,15 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
runner.run(1, false);
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(10010, flowFiles.stream().map(ff -> ff.toByteArray()).filter(content -> content.length == 1 && content[0] == 0x12).count());
- assertEquals(1, mockPool.nextPlannedRecordsQueue.size());
-
- assertEquals(1, mockPool.actualCommitOffsets.size());
- assertEquals(10011L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(2)).continuePolling();
+ verify(mockLease, times(1)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
}
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
- for (final byte[] rawRecord : rawRecords) {
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- final TopicPartition tPart = new TopicPartition(topic, partition);
- final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
- long offset = startingOffset;
-
- for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
- final byte[] key = entry.getKey();
- final byte[] rawRecord = entry.getValue();
- final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
- records.add(rec);
- }
- map.put(tPart, records);
- return new ConsumerRecords(map);
- }
-
- private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
- for (final ConsumerRecords<byte[], byte[]> rec : records) {
- rec.partitions().stream().forEach((part) -> {
- final List<ConsumerRecord<byte[], byte[]>> conRecs = rec.records(part);
- if (map.get(part) != null) {
- throw new IllegalStateException("already have that topic/partition in the record map");
- }
- map.put(part, conRecs);
- });
- }
- return new ConsumerRecords<>(map);
- }
-
- @Test
- public void validateGetAllMessagesWithProvidedDemarcator() throws Exception {
- String groupName = "validateGetAllMessagesWithProvidedDemarcator";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final byte[][] secondPassValues = new byte[][]{
- "Hello-4".getBytes(StandardCharsets.UTF_8),
- "Hello-5".getBytes(StandardCharsets.UTF_8),
- "Hello-6".getBytes(StandardCharsets.UTF_8)
- };
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues),
- createConsumerRecords("bar", 1, 1L, secondPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(2, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-4blahHello-5blahHello-6")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertEquals(2, mockPool.actualCommitOffsets.size());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("foo", 1)).offset());
- assertEquals(4L, mockPool.actualCommitOffsets.get(new TopicPartition("bar", 1)).offset());
- }
-
- @Test
- public void validatePollException() throws Exception {
- String groupName = "validatePollException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnPoll = true;
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(0, flowFiles.size());
- assertNull(null, mockPool.actualCommitOffsets);
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
-
- @Test
- public void validateCommitOffsetException() throws Exception {
- String groupName = "validateCommitOffsetException";
-
- final byte[][] firstPassValues = new byte[][]{
- "Hello-1".getBytes(StandardCharsets.UTF_8),
- "Hello-2".getBytes(StandardCharsets.UTF_8),
- "Hello-3".getBytes(StandardCharsets.UTF_8)
- };
-
- final ConsumerRecords<byte[], byte[]> consumerRecs = mergeRecords(
- createConsumerRecords("foo", 1, 1L, firstPassValues)
- );
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.EMPTY_MAP, null);
- mockPool.nextPlannedRecordsQueue.add(consumerRecs);
- mockPool.throwKafkaExceptionOnCommit = true;
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
-
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.MESSAGE_DEMARCATOR, "blah");
-
- runner.run(1, true);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(1, flowFiles.size());
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1blahHello-2blahHello-3")).count());
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertTrue(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
-
- assertNull(null, mockPool.actualCommitOffsets);
- }
-
- @Test
- public void validateUtf8Key() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
-
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
-
- @Test
- public void validateHexKey() {
- String groupName = "validateGetAllMessages";
-
- final Map<byte[], byte[]> rawRecords = new HashMap<>();
- rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
- rawRecords.put(new byte[0], "Hello-2".getBytes());
- rawRecords.put(null, "Hello-3".getBytes());
-
- final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
-
- final List<String> expectedTopics = new ArrayList<>();
- expectedTopics.add("foo");
- expectedTopics.add("bar");
- final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
- mockPool.nextPlannedRecordsQueue.add(firstRecs);
-
- ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
- @Override
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return mockPool;
- }
- };
- final TestRunner runner = TestRunners.newTestRunner(proc);
- runner.setValidateExpressionUsage(false);
- runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
- runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
- runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
- runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
- runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
-
- runner.run(1, false);
-
- final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
-
- assertEquals(expectedTopics, mockPool.actualTopics);
-
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
- assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
-
- final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
-
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
- assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
-
-
- //asert that all consumers were closed as expected
- //assert that the consumer pool was properly closed
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertFalse(mockPool.wasPoolClosed);
- runner.run(1, true);
- assertFalse(mockPool.wasConsumerLeasePoisoned);
- assertTrue(mockPool.wasConsumerLeaseClosed);
- assertTrue(mockPool.wasPoolClosed);
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 7f88ea2..0ebf2b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -16,109 +16,203 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerPoolTest {
Consumer<byte[], byte[]> consumer = null;
+ ProcessSession mockSession = null;
+ ProvenanceReporter mockReporter = null;
+ ConsumerPool testPool = null;
+ ConsumerPool testDemarcatedPool = null;
ComponentLog logger = null;
@Before
public void setup() {
consumer = mock(Consumer.class);
logger = mock(ComponentLog.class);
- }
-
- @Test
- public void validatePoolSimpleCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
+ mockSession = mock(ProcessSession.class);
+ mockReporter = mock(ProvenanceReporter.class);
+ when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+ testPool = new ConsumerPool(
+ 1,
+ null,
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
}
};
+ testDemarcatedPool = new ConsumerPool(
+ 1,
+ "--demarcator--".getBytes(StandardCharsets.UTF_8),
+ Collections.emptyMap(),
+ Collections.singletonList("nifi"),
+ 100L,
+ "utf-8",
+ "ssl",
+ "localhost",
+ logger) {
+ @Override
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ return consumer;
+ }
+ };
+ }
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ @Test
+ public void validatePoolSimpleCreateClose() throws Exception {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ }
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
lease.poll();
- lease.commitOffsets(Collections.emptyMap());
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
- assertEquals(1, stats.leasesObtainedCount);
- assertEquals(1, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
+ assertEquals(4, stats.leasesObtainedCount);
}
@Test
- public void validatePoolSimpleBatchCreateClose() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(5, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolSimpleCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenReturn(ConsumerRecords.empty());
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ lease.poll();
+ lease.commit();
+ }
+ testPool.close();
+ verify(mockSession, times(3)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+ @Test
+ public void validatePoolSimpleBatchCreateClose() throws Exception {
+ when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
for (int i = 0; i < 100; i++) {
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
for (int j = 0; j < 100; j++) {
lease.poll();
}
- lease.commitOffsets(Collections.emptyMap());
}
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(100, stats.leasesObtainedCount);
- assertEquals(10000, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
@Test
- public void validatePoolConsumerFails() throws Exception {
-
- final ConsumerPool testPool = new ConsumerPool(1, Collections.singletonList("nifi"), Collections.emptyMap(), logger) {
- @Override
- protected Consumer<byte[], byte[]> createKafkaConsumer() {
- return consumer;
- }
+ public void validatePoolBatchCreatePollClose() throws Exception {
+ final byte[][] firstPassValues = new byte[][]{
+ "Hello-1".getBytes(StandardCharsets.UTF_8),
+ "Hello-2".getBytes(StandardCharsets.UTF_8),
+ "Hello-3".getBytes(StandardCharsets.UTF_8)
};
+ final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, firstPassValues);
- when(consumer.poll(anyInt())).thenThrow(new KafkaException());
-
- try (final ConsumerLease lease = testPool.obtainConsumer()) {
+ when(consumer.poll(anyLong())).thenReturn(firstRecs, createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+ try (final ConsumerLease lease = testDemarcatedPool.obtainConsumer(mockSession)) {
lease.poll();
- fail();
- } catch (final KafkaException ke) {
+ lease.commit();
+ }
+ testDemarcatedPool.close();
+ verify(mockSession, times(1)).create();
+ verify(mockSession, times(1)).commit();
+ final PoolStats stats = testDemarcatedPool.getPoolStats();
+ assertEquals(1, stats.consumerCreatedCount);
+ assertEquals(1, stats.consumerClosedCount);
+ assertEquals(1, stats.leasesObtainedCount);
+ }
+
+ @Test
+ public void validatePoolConsumerFails() throws Exception {
+
+ when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+ try (final ConsumerLease lease = testPool.obtainConsumer(mockSession)) {
+ try {
+ lease.poll();
+ fail();
+ } catch (final KafkaException ke) {
+ }
}
testPool.close();
+ verify(mockSession, times(0)).create();
+ verify(mockSession, times(0)).commit();
final PoolStats stats = testPool.getPoolStats();
assertEquals(1, stats.consumerCreatedCount);
assertEquals(1, stats.consumerClosedCount);
assertEquals(1, stats.leasesObtainedCount);
- assertEquals(0, stats.unproductivePollCount);
- assertEquals(0, stats.productivePollCount);
}
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
+ final TopicPartition tPart = new TopicPartition(topic, partition);
+ final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ long offset = startingOffset;
+ for (final byte[] rawRecord : rawRecords) {
+ final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, UUID.randomUUID().toString().getBytes(), rawRecord);
+ records.add(rec);
+ }
+ map.put(tPart, records);
+ return new ConsumerRecords(map);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
index af0d343..b3f1bd1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -358,7 +358,7 @@ public class PublishKafkaTest {
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
- runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
+ runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, KafkaProcessorUtils.HEX_ENCODING);
runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 0a3fe5d..0b8a752 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -25,13 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -39,13 +34,11 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -53,17 +46,18 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURITY_PROTOCOL;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.9 Consumer API. "
+ " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the mean time"
+ " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on.")
-@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
+@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9"})
@WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
- + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
+ + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@@ -75,18 +69,12 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
public class ConsumeKafka extends AbstractProcessor {
- private static final long TWO_MB = 2L * 1024L * 1024L;
-
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
- static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
- static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
- "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
@@ -136,6 +124,7 @@ public class ConsumeKafka extends AbstractProcessor {
+ "will result in a single FlowFile which "
+ "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS")
.build();
+
static final PropertyDescriptor MAX_POLL_RECORDS = new PropertyDescriptor.Builder()
.name("max.poll.records")
.displayName("Max Poll Records")
@@ -145,6 +134,20 @@ public class ConsumeKafka extends AbstractProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
+ static final PropertyDescriptor MAX_UNCOMMITTED_TIME = new PropertyDescriptor.Builder()
+ .name("max-uncommit-offset-wait")
+ .displayName("Max Uncommitted Time")
+ .description("Specifies the maximum amount of time allowed to pass before offsets must be committed. "
+ + "This value impacts how often offsets will be committed. Committing offsets less often increases "
+ + "throughput but also increases the window of potential data duplication in the event of a rebalance "
+ + "or JVM restart between commits. This value is also related to maximum poll records and the use "
+ + "of a message demarcator. When using a message demarcator we can have far more uncommitted messages "
+ + "than when we're not as there is much less for us to keep track of in memory.")
+ .required(false)
+ .defaultValue("1 secs")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.")
@@ -153,7 +156,6 @@ public class ConsumeKafka extends AbstractProcessor {
static final List<PropertyDescriptor> DESCRIPTORS;
static final Set<Relationship> RELATIONSHIPS;
- private volatile byte[] demarcatorBytes = null;
private volatile ConsumerPool consumerPool = null;
static {
@@ -165,6 +167,7 @@ public class ConsumeKafka extends AbstractProcessor {
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS);
+ descriptors.add(MAX_UNCOMMITTED_TIME);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
}
@@ -179,16 +182,8 @@ public class ConsumeKafka extends AbstractProcessor {
return DESCRIPTORS;
}
- @OnScheduled
- public void prepareProcessing(final ProcessContext context) {
- this.demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet()
- ? context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
- : null;
- }
-
@OnStopped
public void close() {
- demarcatorBytes = null;
final ConsumerPool pool = consumerPool;
consumerPool = null;
if (pool != null) {
@@ -215,9 +210,21 @@ public class ConsumeKafka extends AbstractProcessor {
return pool;
}
+ return consumerPool = createConsumerPool(context, getLogger());
+ }
+
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ final int maxLeases = context.getMaxConcurrentTasks();
+ final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+ final byte[] demarcator = context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).isSet()
+ ? context.getProperty(ConsumeKafka.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
+ : null;
final Map<String, String> props = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
- final String topicListing = context.getProperty(TOPICS).evaluateAttributeExpressions().getValue();
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ final String topicListing = context.getProperty(ConsumeKafka.TOPICS).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -225,213 +232,40 @@ public class ConsumeKafka extends AbstractProcessor {
topics.add(trimmedName);
}
}
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
- return consumerPool = createConsumerPool(context.getMaxConcurrentTasks(), topics, props, getLogger());
- }
+ final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+ final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
- protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
- return new ConsumerPool(maxLeases, topics, props, log);
+ return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
- final long startTimeNanos = System.nanoTime();
final ConsumerPool pool = getConsumerPool(context);
if (pool == null) {
context.yield();
return;
}
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap = new HashMap<>();
-
- try (final ConsumerLease lease = pool.obtainConsumer()) {
- try {
- if (lease == null) {
- context.yield();
- return;
- }
- final boolean foundData = gatherDataFromKafka(lease, partitionRecordMap, context);
- if (!foundData) {
- session.rollback();
- return;
- }
-
- writeSessionData(context, session, partitionRecordMap, startTimeNanos);
- //At-least once commit handling (if order is reversed it is at-most once)
- session.commit();
- commitOffsets(lease, partitionRecordMap);
- } catch (final KafkaException ke) {
- lease.poison();
- getLogger().error("Problem while accessing kafka consumer " + ke, ke);
+ try (final ConsumerLease lease = pool.obtainConsumer(session)) {
+ if (lease == null) {
context.yield();
- session.rollback();
- }
- }
- }
-
- private void commitOffsets(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap) {
- final Map<TopicPartition, OffsetAndMetadata> partOffsetMap = new HashMap<>();
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .forEach((entry) -> {
- long maxOffset = entry.getValue().stream()
- .mapToLong(record -> record.offset())
- .max()
- .getAsLong();
- partOffsetMap.put(entry.getKey(), new OffsetAndMetadata(maxOffset + 1L));
- });
- lease.commitOffsets(partOffsetMap);
- }
-
- private void writeSessionData(
- final ProcessContext context, final ProcessSession session,
- final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap,
- final long startTimeNanos) {
- if (demarcatorBytes != null) {
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .forEach(entry -> {
- writeData(context, session, entry.getValue(), startTimeNanos);
- });
- } else {
- partitionRecordMap.entrySet().stream()
- .filter(entry -> !entry.getValue().isEmpty())
- .flatMap(entry -> entry.getValue().stream())
- .forEach(record -> {
- writeData(context, session, Collections.singletonList(record), startTimeNanos);
- });
- }
- }
-
- private String encodeKafkaKey(final byte[] key, final String encoding) {
- if (key == null) {
- return null;
- }
-
- if (HEX_ENCODING.getValue().equals(encoding)) {
- return DatatypeConverter.printHexBinary(key);
- } else if (UTF8_ENCODING.getValue().equals(encoding)) {
- return new String(key, StandardCharsets.UTF_8);
- } else {
- return null; // won't happen because it is guaranteed by the Allowable Values
- }
- }
-
- private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
- final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
- final String offset = String.valueOf(firstRecord.offset());
- final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
- final String topic = firstRecord.topic();
- final String partition = String.valueOf(firstRecord.partition());
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, out -> {
- boolean useDemarcator = false;
- for (final ConsumerRecord<byte[], byte[]> record : records) {
- if (useDemarcator) {
- out.write(demarcatorBytes);
- }
- out.write(record.value());
- useDemarcator = true;
+ return;
}
- });
- final Map<String, String> kafkaAttrs = new HashMap<>();
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
- if (keyValue != null && records.size() == 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
- }
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);
- if (records.size() > 1) {
- kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(records.size()));
- }
- flowFile = session.putAllAttributes(flowFile, kafkaAttrs);
- final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
- final String transitUri = KafkaProcessorUtils.buildTransitURI(
- context.getProperty(SECURITY_PROTOCOL).getValue(),
- context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(),
- topic);
- session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
- this.getLogger().debug("Created {} containing {} messages from Kafka topic {}, partition {}, starting offset {} in {} millis",
- new Object[]{flowFile, records.size(), topic, partition, offset, executionDurationMillis});
- session.transfer(flowFile, REL_SUCCESS);
- }
-
- /**
- * Populates the given partitionRecordMap with new records until we poll
- * that returns no records or until we have enough data. It is important to
- * ensure we keep items grouped by their topic and partition so that when we
- * bundle them we bundle them intelligently and so that we can set offsets
- * properly even across multiple poll calls.
- */
- private boolean gatherDataFromKafka(final ConsumerLease lease, final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, ProcessContext context) {
- final long startNanos = System.nanoTime();
- boolean foundData = false;
- ConsumerRecords<byte[], byte[]> records;
- final int maxRecords = context.getProperty(MAX_POLL_RECORDS).asInteger();
-
- do {
- records = lease.poll();
-
- for (final TopicPartition partition : records.partitions()) {
- List<ConsumerRecord<byte[], byte[]>> currList = partitionRecordMap.get(partition);
- if (currList == null) {
- currList = new ArrayList<>();
- partitionRecordMap.put(partition, currList);
+ try {
+ while (this.isScheduled() && lease.continuePolling()) {
+ lease.poll();
}
- currList.addAll(records.records(partition));
- if (currList.size() > 0) {
- foundData = true;
+ if (this.isScheduled() && !lease.commit()) {
+ context.yield();
}
+ } catch (final KafkaException kex) {
+ getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
+ new Object[]{lease, kex}, kex);
+ } catch (final Throwable t) {
+ getLogger().error("Exception while processing data from kafka so will close the lease {} due to {}",
+ new Object[]{lease, t}, t);
}
- //If we received data and we still want to get more
- } while (!records.isEmpty() && !checkIfGatheredEnoughData(partitionRecordMap, maxRecords, startNanos));
- return foundData;
- }
-
- /**
- * Determines if we have enough data as-is and should move on.
- *
- * @return true if we've been gathering for more than 500 ms or if we're
- * demarcating and have more than 50 flowfiles worth or if we're per message
- * and have more than 2000 flowfiles or if totalMessageSize is greater than
- * two megabytes; false otherwise
- *
- * Implementation note: 500 millis and 5 MB are magic numbers. These may
- * need to be tuned. They get at how often offsets will get committed to
- * kafka relative to how many records will get buffered into memory in a
- * poll call before writing to repos.
- */
- private boolean checkIfGatheredEnoughData(final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecordMap, final long maxRecords, final long startTimeNanos) {
-
- final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
-
- if (durationMillis > 500) {
- return true;
- }
-
- int topicPartitionsFilled = 0;
- int totalRecords = 0;
- long totalRecordSize = 0;
-
- for (final List<ConsumerRecord<byte[], byte[]>> recordList : partitionRecordMap.values()) {
- if (!recordList.isEmpty()) {
- topicPartitionsFilled++;
- }
- totalRecords += recordList.size();
- for (final ConsumerRecord<byte[], byte[]> rec : recordList) {
- totalRecordSize += rec.value().length;
- }
- }
-
- if (demarcatorBytes != null && demarcatorBytes.length > 0) {
- return topicPartitionsFilled > 50;
- } else if (totalRecordSize > TWO_MB) {
- return true;
- } else {
- return totalRecords > maxRecords;
}
}
-
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/7a451935/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index b954eba..5b8ba1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -17,11 +17,27 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.io.Closeable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import javax.xml.bind.DatatypeConverter;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafka.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@@ -30,15 +46,108 @@ import org.apache.kafka.common.TopicPartition;
* the lease will be returned to the pool for future use by others. A given
* lease may only belong to a single thread a time.
*/
-public interface ConsumerLease extends Closeable {
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+ private final long maxWaitMillis;
+ private final Consumer<byte[], byte[]> kafkaConsumer;
+ private final ComponentLog logger;
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
+ private boolean poisoned = false;
+ //used for tracking demarcated flowfiles to their TopicPartition so we can append
+ //to them on subsequent poll calls
+ private final Map<TopicPartition, BundleTracker> bundleMap = new HashMap<>();
+ private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+ private long leaseStartNanos = -1;
+ private boolean lastPollEmpty = false;
+ private int totalFlowFiles = 0;
+
+ ConsumerLease(
+ final long maxWaitMillis,
+ final Consumer<byte[], byte[]> kafkaConsumer,
+ final byte[] demarcatorBytes,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger) {
+ this.maxWaitMillis = maxWaitMillis;
+ this.kafkaConsumer = kafkaConsumer;
+ this.demarcatorBytes = demarcatorBytes;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.logger = logger;
+ }
+
+ /**
+ * clears out internal state elements excluding session and consumer as
+ * those are managed by the pool itself
+ */
+ private void resetInternalState() {
+ bundleMap.clear();
+ uncommittedOffsetsMap.clear();
+ leaseStartNanos = -1;
+ lastPollEmpty = false;
+ totalFlowFiles = 0;
+ }
+
+ /**
+ * Kafka will call this method whenever it is about to rebalance the
+ * consumers for the given partitions. We'll simply take this to mean that
+ * we need to quickly commit what we've got and will return the consumer to
+ * the pool. This method will be called during the poll() method call of
+ * this class and will be called by the same thread calling poll according
+ * to the Kafka API docs. After this method executes the session and kafka
+ * offsets are committed and this lease is closed.
+ *
+ * @param partitions partitions being reassigned
+ */
+ @Override
+ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns parittion
+ commit();
+ }
/**
- * Executes a poll on the underlying Kafka Consumer.
+ * This will be called by Kafka when the rebalance has completed. We don't
+ * need to do anything with this information other than optionally log it as
+ * by this point we've committed what we've got and moved on.
*
- * @return ConsumerRecords retrieved in the poll.
- * @throws KafkaException if issue occurs talking to underlying resource.
+ * @param partitions topic partition set being reassigned
*/
- ConsumerRecords<byte[], byte[]> poll() throws KafkaException;
+ @Override
+ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ }
+
+ /**
+ * Executes a poll on the underlying Kafka Consumer and creates any new
+ * flowfiles necessary or appends to existing ones if in demarcation mode.
+ */
+ void poll() {
+ /**
+ * Implementation note: If we take too long (30 secs?) between kafka
+ * poll calls and our own record processing to any subsequent poll calls
+ * or the commit we can run into a situation where the commit will
+ * succeed to the session but fail on committing offsets. This is
+ * apparently different than the Kafka scenario of electing to rebalance
+ * for other reasons but in this case is due a session timeout. It
+ * appears Kafka KIP-62 aims to offer more control over the meaning of
+ * various timeouts. If we do run into this case it could result in
+ * duplicates.
+ */
+ try {
+ final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+ lastPollEmpty = records.count() == 0;
+ processRecords(records);
+ } catch (final Throwable t) {
+ this.poison();
+ throw t;
+ }
+ }
/**
* Notifies Kafka to commit the offsets for the specified topic/partition
@@ -47,22 +156,244 @@ public interface ConsumerLease extends Closeable {
* kafka client to collect more data from Kafka before committing the
* offsets.
*
- * @param offsets offsets
- * @throws KafkaException if issue occurs talking to underlying resource.
+ * if false then we didn't do anything and should probably yield if true
+ * then we committed new data
+ *
+ */
+ boolean commit() {
+ if (uncommittedOffsetsMap.isEmpty()) {
+ resetInternalState();
+ return false;
+ }
+ try {
+ /**
+ * Committing the nifi session then the offsets means we have an at
+ * least once guarantee here. If we reversed the order we'd have at
+ * most once.
+ */
+ final Collection<FlowFile> bundledFlowFiles = getBundles();
+ if (!bundledFlowFiles.isEmpty()) {
+ getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+ }
+ getProcessSession().commit();
+ kafkaConsumer.commitSync(uncommittedOffsetsMap);
+ resetInternalState();
+ return true;
+ } catch (final KafkaException kex) {
+ poison();
+ logger.warn("Duplicates are likely as we were able to commit the process"
+ + " session but received an exception from Kafka while committing"
+ + " offsets.");
+ throw kex;
+ } catch (final Throwable t) {
+ poison();
+ throw t;
+ }
+ }
+
+ /**
+ * Indicates whether we should continue polling for data. If we are not
+ * writing data with a demarcator then we're writing individual flow files
+ * per kafka message therefore we must be very mindful of memory usage for
+ * the flow file objects (not their content) being held in memory. The
+ * content of kafka messages will be written to the content repository
+ * immediately upon each poll call but we must still be mindful of how much
+ * memory can be used in each poll call. We will indicate that we should
+ * stop polling our last poll call produced no new results or if we've
+ * polling and processing data longer than the specified maximum polling
+ * time or if we have reached out specified max flow file limit or if a
+ * rebalance has been initiated for one of the partitions we're watching;
+ * otherwise true.
+ *
+ * @return true if should keep polling; false otherwise
+ */
+ boolean continuePolling() {
+ //stop if the last poll produced new no data
+ if (lastPollEmpty) {
+ return false;
+ }
+
+ //stop if we've gone past our desired max uncommitted wait time
+ if (leaseStartNanos < 0) {
+ leaseStartNanos = System.nanoTime();
+ }
+ final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ if (durationMillis > maxWaitMillis) {
+ return false;
+ }
+
+ //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+ if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+ return false;
+ } else {
+ return totalFlowFiles < 15000;//admittedlly a magic number - good candidate for processor property
+ }
+ }
+
+ /**
+ * Indicates that the underlying session and consumer should be immediately
+ * considered invalid. Once closed the session will be rolled back and the
+ * pool should destroy the underlying consumer. This is useful if due to
+ * external reasons, such as the processor no longer being scheduled, this
+ * lease should be terminated immediately.
*/
- void commitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) throws KafkaException;
+ private void poison() {
+ poisoned = true;
+ }
/**
- * Notifies that this lease is poisoned and should not be reused.
+ * @return true if this lease has been poisoned; false otherwise
*/
- void poison();
+ boolean isPoisoned() {
+ return poisoned;
+ }
/**
- * Notifies that this lease is to be returned. The pool may optionally reuse
- * this lease with another client. No further references by the caller
- * should occur after calling close.
+ * Abstract method that is intended to be extended by the pool that created
+ * this ConsumerLease object. It should ensure that the session given to
+ * create this session is rolled back and that the underlying kafka consumer
+ * is either returned to the pool for continued use or destroyed if this
+ * lease has been poisoned. It can only be called once. Calling it more than
+ * once can result in undefined and non threadsafe behavior.
*/
@Override
- void close();
+ public void close() {
+ resetInternalState();
+ }
+
+ public abstract ProcessSession getProcessSession();
+
+ private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+
+ records.partitions().stream().forEach(partition -> {
+ List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+ if (!messages.isEmpty()) {
+ //update maximum offset map for this topic partition
+ long maxOffset = messages.stream()
+ .mapToLong(record -> record.offset())
+ .max()
+ .getAsLong();
+ uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+
+ //write records to content repository and session
+ if (demarcatorBytes == null) {
+ totalFlowFiles += messages.size();
+ messages.stream().forEach(message -> {
+ writeData(getProcessSession(), message, partition);
+ });
+ } else {
+ writeData(getProcessSession(), messages, partition);
+ }
+ }
+ });
+ }
+
+ private static String encodeKafkaKey(final byte[] key, final String encoding) {
+ if (key == null) {
+ return null;
+ }
+
+ if (HEX_ENCODING.getValue().equals(encoding)) {
+ return DatatypeConverter.printHexBinary(key);
+ } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+ return new String(key, StandardCharsets.UTF_8);
+ } else {
+ return null; // won't happen because it is guaranteed by the Allowable Values
+ }
+ }
+
+ private Collection<FlowFile> getBundles() {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ for (final BundleTracker tracker : bundleMap.values()) {
+ populateAttributes(tracker);
+ flowFiles.add(tracker.flowFile);
+ }
+ return flowFiles;
+ }
+
+ private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+ FlowFile flowFile = session.create();
+ final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+ tracker.incrementRecordCount(1);
+ flowFile = session.write(flowFile, out -> {
+ out.write(record.value());
+ });
+ tracker.updateFlowFile(flowFile);
+ populateAttributes(tracker);
+ session.transfer(tracker.flowFile, REL_SUCCESS);
+ }
+
+ private void writeData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+ final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
+ final boolean demarcateFirstRecord;
+ BundleTracker tracker = bundleMap.get(topicPartition);
+ FlowFile flowFile;
+ if (tracker == null) {
+ tracker = new BundleTracker(firstRecord, topicPartition, keyEncoding);
+ flowFile = session.create();
+ tracker.updateFlowFile(flowFile);
+ demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+ } else {
+ demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+ }
+ flowFile = tracker.flowFile;
+ tracker.incrementRecordCount(records.size());
+ flowFile = session.append(flowFile, out -> {
+ boolean useDemarcator = demarcateFirstRecord;
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ if (useDemarcator) {
+ out.write(demarcatorBytes);
+ }
+ out.write(record.value());
+ useDemarcator = true;
+ }
+ });
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(topicPartition, tracker);
+ }
+
+ private void populateAttributes(final BundleTracker tracker) {
+ final Map<String, String> kafkaAttrs = new HashMap<>();
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ if (tracker.key != null && tracker.totalRecords == 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+ }
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+ if (tracker.totalRecords > 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+ }
+ final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+ final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+ getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+ tracker.updateFlowFile(newFlowFile);
+ }
+
+ private static class BundleTracker {
+
+ final long initialOffset;
+ final int partition;
+ final String topic;
+ final String key;
+ FlowFile flowFile;
+ long totalRecords = 0;
+
+ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+ this.initialOffset = initialRecord.offset();
+ this.partition = topicPartition.partition();
+ this.topic = topicPartition.topic();
+ this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+ }
+
+ private void incrementRecordCount(final long count) {
+ totalRecords += count;
+ }
+
+ private void updateFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ }
+
+ }
}