You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/11/18 15:03:36 UTC
[nifi] branch main updated: NIFI-8021: Provide the ability to pin
partitions to particular hosts when using ConsumeKafka processors
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7848ba5 NIFI-8021: Provide the ability to pin partitions to particular hosts when using ConsumeKafka processors
7848ba5 is described below
commit 7848ba59a250ba4ae98d7062869bcf01a9a58b71
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Nov 17 17:27:39 2020 -0500
NIFI-8021: Provide the ability to pin partitions to particular hosts when using ConsumeKafka processors
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4672.
---
.../kafka/pubsub/ConsumeKafkaRecord_2_0.java | 52 ++++-
.../processors/kafka/pubsub/ConsumeKafka_2_0.java | 55 +++++-
.../kafka/pubsub/ConsumerPartitionsUtil.java | 209 +++++++++++++++++++++
.../nifi/processors/kafka/pubsub/ConsumerPool.java | 64 ++++++-
.../kafka/pubsub/KafkaProcessorUtils.java | 8 +-
.../additionalDetails.html | 55 ++++++
.../additionalDetails.html | 55 ++++++
.../processors/kafka/pubsub/ConsumerPoolTest.java | 4 +-
.../kafka/pubsub/TestConsumerPartitionsUtil.java | 120 ++++++++++++
.../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 52 ++++-
.../processors/kafka/pubsub/ConsumeKafka_2_6.java | 52 ++++-
.../kafka/pubsub/ConsumerPartitionsUtil.java | 209 +++++++++++++++++++++
.../nifi/processors/kafka/pubsub/ConsumerPool.java | 67 +++++--
.../kafka/pubsub/KafkaProcessorUtils.java | 11 +-
.../additionalDetails.html | 55 ++++++
.../additionalDetails.html | 55 ++++++
.../processors/kafka/pubsub/ConsumerPoolTest.java | 6 +-
.../kafka/pubsub/TestConsumerPartitionsUtil.java | 120 ++++++++++++
18 files changed, 1206 insertions(+), 43 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index 79824b1..f279763 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -44,6 +44,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
@@ -313,7 +314,26 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- return KafkaProcessorUtils.validateCommonProperties(validationContext);
+ final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+ final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+ validationResults.add(consumerPartitionsResult);
+
+ final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+ if (explicitPartitionMapping) {
+ final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+ if (TOPIC_PATTERN.getValue().equals(topicType)) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(TOPIC_TYPE.getDisplayName())
+ .input(TOPIC_PATTERN.getDisplayName())
+ .valid(false)
+ .explanation("It is not valid to explicitly assign Topic Partitions and also use a Topic Pattern. "
+ + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+ .build());
+ }
+ }
+
+ return validationResults;
}
private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -322,7 +342,24 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
return pool;
}
- return consumerPool = createConsumerPool(context, getLogger());
+ final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+ final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+ if (numAssignedPartitions > 0) {
+ // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+ // all of the partitions assigned.
+ final int partitionCount = consumerPool.getPartitionCount();
+ if (partitionCount != numAssignedPartitions) {
+ context.yield();
+ consumerPool.close();
+
+ throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+ " topic(s) have " + partitionCount + " partitions");
+ }
+ }
+
+ this.consumerPool = consumerPool;
+ return consumerPool;
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -355,6 +392,13 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+ final int[] partitionsToConsume;
+ try {
+ partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+ } catch (final UnknownHostException uhe) {
+ throw new ProcessException("Could not determine localhost's hostname", uhe);
+ }
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -364,11 +408,11 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index fc00693..a25f0b4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -77,7 +78,6 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
public class ConsumeKafka_2_0 extends AbstractProcessor {
-
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
@@ -286,7 +286,26 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- return KafkaProcessorUtils.validateCommonProperties(validationContext);
+ final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+ final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+ validationResults.add(consumerPartitionsResult);
+
+ final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+ if (explicitPartitionMapping) {
+ final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+ if (TOPIC_PATTERN.getValue().equals(topicType)) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(TOPIC_TYPE.getDisplayName())
+ .input(TOPIC_PATTERN.getDisplayName())
+ .valid(false)
+ .explanation("It is not valid to explicitly assign Topic Partitions and also use a Topic Pattern. "
+ + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+ .build());
+ }
+ }
+
+ return validationResults;
}
private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -295,7 +314,24 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
return pool;
}
- return consumerPool = createConsumerPool(context, getLogger());
+ final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+ final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+ if (numAssignedPartitions > 0) {
+ // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+ // all of the partitions assigned.
+ final int partitionCount = consumerPool.getPartitionCount();
+ if (partitionCount != numAssignedPartitions) {
+ context.yield();
+ consumerPool.close();
+
+ throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+ " topic(s) have " + partitionCount + " partitions");
+ }
+ }
+
+ this.consumerPool = consumerPool;
+ return consumerPool;
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -328,6 +364,13 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
+ final int[] partitionsToConsume;
+ try {
+ partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+ } catch (final UnknownHostException uhe) {
+ throw new ProcessException("Could not determine localhost's hostname", uhe);
+ }
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -337,11 +380,11 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
}
return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
@@ -379,6 +422,8 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
activeLeases.clear();
}
+
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final ConsumerPool pool = getConsumerPool(context);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
new file mode 100644
index 0000000..bf71e78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerPartitionsUtil {
+ public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";
+
+ public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException {
+ final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties);
+ final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString);
+
+ if (partitionsByHost.isEmpty()) {
+ // Explicit partitioning is not enabled.
+ logger.debug("No explicit Consumer Partitions have been declared.");
+ return null;
+ }
+
+ logger.info("Found the following mapping of hosts to partitions: {}", new Object[] {hostnameToPartitionString});
+
+ // Determine the partitions based on hostname/IP.
+ int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+ if (partitionsForThisHost == null) {
+ throw new IllegalArgumentException("Could not find a partition mapping for host " + InetAddress.getLocalHost().getCanonicalHostName());
+ }
+
+ return partitionsForThisHost;
+ }
+
+ private static Map<String, int[]> mapPartitionValueToIntArrays(final Map<String, String> partitionValues) {
+ final Map<String, int[]> partitionsByHost = new HashMap<>();
+ for (final Map.Entry<String, String> entry : partitionValues.entrySet()) {
+ final String host = entry.getKey();
+ final int[] partitions = parsePartitions(host, entry.getValue());
+ partitionsByHost.put(host, partitions);
+ }
+
+ return partitionsByHost;
+ }
+
+ private static int[] getPartitionsForThisHost(final Map<String, int[]> partitionsByHost) throws UnknownHostException {
+ // Determine the partitions based on hostname/IP.
+ final InetAddress localhost = InetAddress.getLocalHost();
+ int[] partitionsForThisHost = partitionsByHost.get(localhost.getCanonicalHostName());
+ if (partitionsForThisHost != null) {
+ return partitionsForThisHost;
+ }
+
+ partitionsForThisHost = partitionsByHost.get(localhost.getHostName());
+ if (partitionsForThisHost != null) {
+ return partitionsForThisHost;
+ }
+
+ return partitionsByHost.get(localhost.getHostAddress());
+ }
+
+ private static Map<String, String> mapHostnamesToPartitionStrings(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionString = new HashMap<>();
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ final String propertyName = entry.getKey();
+ if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) {
+ continue;
+ }
+
+ if (propertyName.length() <= PARTITION_PROPERTY_NAME_PREFIX.length()) {
+ continue;
+ }
+
+ final String propertyNameAfterPrefix = propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length());
+ hostnameToPartitionString.put(propertyNameAfterPrefix, entry.getValue());
+ }
+
+ return hostnameToPartitionString;
+ }
+
+ private static int[] parsePartitions(final String hostname, final String propertyValue) {
+ final String[] splits = propertyValue.split(",");
+ final List<Integer> partitionList = new ArrayList<>();
+ for (final String split : splits) {
+ if (split.trim().isEmpty()) {
+ continue;
+ }
+
+ try {
+ final int partition = Integer.parseInt(split.trim());
+ if (partition < 0) {
+ throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is negative");
+ }
+
+ partitionList.add(partition);
+ } catch (final NumberFormatException nfe) {
+ throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is not an integer");
+ }
+ }
+
+ // Map out List<Integer> to int[]
+ return partitionList.stream().mapToInt(Integer::intValue).toArray();
+ }
+
+ public static ValidationResult validateConsumePartitions(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+ if (hostnameToPartitionMapping.isEmpty()) {
+ // Partitions are not being explicitly assigned.
+ return new ValidationResult.Builder().valid(true).build();
+ }
+
+ final Set<Integer> partitionsClaimed = new HashSet<>();
+ final Set<Integer> duplicatePartitions = new HashSet<>();
+ for (final Map.Entry<String, String> entry : hostnameToPartitionMapping.entrySet()) {
+ final int[] partitions = parsePartitions(entry.getKey(), entry.getValue());
+ for (final int partition : partitions) {
+ final boolean added = partitionsClaimed.add(partition);
+ if (!added) {
+ duplicatePartitions.add(partition);
+ }
+ }
+ }
+
+ final List<Integer> partitionsMissing = new ArrayList<>();
+ for (int i=0; i < partitionsClaimed.size(); i++) {
+ if (!partitionsClaimed.contains(i)) {
+ partitionsMissing.add(i);
+ }
+ }
+
+ if (!partitionsMissing.isEmpty()) {
+ return new ValidationResult.Builder()
+ .subject("Partitions")
+ .input(partitionsClaimed.toString())
+ .valid(false)
+ .explanation("The following partitions were not mapped to any node: " + partitionsMissing.toString())
+ .build();
+ }
+
+ if (!duplicatePartitions.isEmpty()) {
+ return new ValidationResult.Builder()
+ .subject("Partitions")
+ .input(partitionsClaimed.toString())
+ .valid(false)
+ .explanation("The following partitions were mapped to multiple nodes: " + duplicatePartitions.toString())
+ .build();
+ }
+
+ final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+ final int[] partitionsForThisHost;
+ try {
+ partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+ } catch (UnknownHostException e) {
+ return new ValidationResult.Builder()
+ .valid(false)
+ .subject("Partition Assignment")
+ .explanation("Unable to determine hostname of localhost")
+ .build();
+ }
+
+ if (partitionsForThisHost == null) {
+ return new ValidationResult.Builder()
+ .subject("Partition Assignment")
+ .valid(false)
+ .explanation("No assignment was given for this host")
+ .build();
+ }
+
+ return new ValidationResult.Builder().valid(true).build();
+ }
+
+ public static boolean isPartitionAssignmentExplicit(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+ return !hostnameToPartitionMapping.isEmpty();
+ }
+
+ public static int getPartitionAssignmentCount(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+ final Map<String, int[]> partitions = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+
+ int count = 0;
+ for (final int[] partitionArray : partitions.values()) {
+ count += partitionArray.length;
+ }
+
+ return count;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 0462729..99293c6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -60,6 +62,7 @@ public class ConsumerPool implements Closeable {
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
private final boolean separateByKey;
+ private final int[] partitionsToConsume;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -97,7 +100,8 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -114,6 +118,7 @@ public class ConsumerPool implements Closeable {
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
+ this.partitionsToConsume = partitionsToConsume;
}
public ConsumerPool(
@@ -129,7 +134,8 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -146,6 +152,7 @@ public class ConsumerPool implements Closeable {
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
+ this.partitionsToConsume = partitionsToConsume;
}
public ConsumerPool(
@@ -162,7 +169,8 @@ public class ConsumerPool implements Closeable {
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey,
- final String keyEncoding) {
+ final String keyEncoding,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -179,6 +187,7 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
+ this.partitionsToConsume = partitionsToConsume;
}
public ConsumerPool(
@@ -195,7 +204,8 @@ public class ConsumerPool implements Closeable {
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey,
- final String keyEncoding) {
+ final String keyEncoding,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -212,6 +222,29 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
+ this.partitionsToConsume = partitionsToConsume;
+ }
+
+ public int getPartitionCount() {
+ // If using regex for topic names, just return -1
+ if (topics == null || topics.isEmpty()) {
+ return -1;
+ }
+
+ int partitionsEachTopic = 0;
+ try (final Consumer<byte[], byte[]> consumer = createKafkaConsumer()) {
+ for (final String topicName : topics) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
+ final int partitionsThisTopic = partitionInfos.size();
+ if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) {
+ throw new IllegalStateException("The specific topic names do not have the same number of partitions");
+ }
+
+ partitionsEachTopic = partitionsThisTopic;
+ }
+ }
+
+ return partitionsEachTopic;
}
/**
@@ -241,12 +274,25 @@ public class ConsumerPool implements Closeable {
*/
lease = new SimpleConsumerLease(consumer);
- // This subscription tightly couples the lease to the given
- // consumer. They cannot be separated from then on.
- if (topics != null) {
- consumer.subscribe(topics, lease);
+ if (partitionsToConsume == null) {
+ // This subscription tightly couples the lease to the given
+ // consumer. They cannot be separated from then on.
+ if (topics != null) {
+ consumer.subscribe(topics, lease);
+ } else {
+ consumer.subscribe(topicPattern, lease);
+ }
} else {
- consumer.subscribe(topicPattern, lease);
+ final List<TopicPartition> topicPartitions = new ArrayList<>();
+
+ for (final String topic : topics) {
+ for (final int partition : partitionsToConsume) {
+ final TopicPartition topicPartition = new TopicPartition(topic, partition);
+ topicPartitions.add(topicPartition);
+ }
+ }
+
+ consumer.assign(topicPartitions);
}
}
lease.setProcessSession(session, processContext);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index b089fce..bdddfad 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -356,6 +356,10 @@ public final class KafkaProcessorUtils {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (subject.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+ return new ValidationResult.Builder().valid(true).build();
+ }
+
final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
}
@@ -401,7 +405,9 @@ public final class KafkaProcessorUtils {
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
- if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
+ if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())
+ && !propertyName.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
// or the standard NiFi time period such as "5 secs"
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
index 8f7c4e9..ec2e266 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
@@ -29,6 +29,61 @@
written to a FlowFile by serializing the message with the configured Record Writer.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+ assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+ using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+ <code>partitions.<hostname></code> with the value being a comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+ partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+ if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+ started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+ for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+ added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+ do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+ to log errors on startup and will not pull data.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+ Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+ partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+ </p>
+
<h2>Security Configuration:</h2>
<p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
index bd061b2..e7cf092 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
@@ -29,6 +29,61 @@
of the Kafka message.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+ assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+ using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+ <code>partitions.<hostname></code> with the value being a comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+ partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+ if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+ started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+ for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+ added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+ do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+ to log errors on startup and will not pull data.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+ Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+ partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+ </p>
+
<h2>Security Configuration</h2>
<p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index d006a6e..3414420 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -80,6 +80,7 @@ public class ConsumerPoolTest {
logger,
true,
StandardCharsets.UTF_8,
+ null,
null) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
@@ -99,7 +100,8 @@ public class ConsumerPoolTest {
logger,
true,
StandardCharsets.UTF_8,
- Pattern.compile(".*")) {
+ Pattern.compile(".*"),
+ null) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
new file mode 100644
index 0000000..1053f2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestConsumerPartitionsUtil {
+ private final ComponentLog logger = new MockComponentLogger();
+ private String hostname;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ hostname = InetAddress.getLocalHost().getHostName();;
+ }
+
+ @Test
+ public void testNoPartitionAssignments() throws UnknownHostException {
+ final Map<String, String> properties = Collections.singletonMap("key", "value");
+ final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+ assertNull(partitions);
+ }
+
+ @Test
+ public void testAllPartitionsAssignedToOneHost() throws UnknownHostException {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "0, 1, 2, 3");
+ final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+ assertNotNull(partitions);
+
+ assertArrayEquals(new int[] {0, 1, 2, 3}, partitions);
+ }
+
+ @Test
+ public void testSomePartitionsSkipped() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "0, 1, 2, 3, 5");
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertFalse(invalidResult.isValid());
+
+ properties.put("partitions." + hostname, "0, 1,2,3,4, 5");
+ final ValidationResult validResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(validResult);
+ assertTrue(validResult.isValid());
+ }
+
+ @Test
+ public void testCurrentNodeNotSpecified() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions.other-host", "0, 1, 2, 3");
+
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertFalse(invalidResult.isValid());
+ }
+
+ @Test
+ public void testPartitionListedTwice() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "2");
+ properties.put("partitions.other-host", "0, 1, 2, 3");
+
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertFalse(invalidResult.isValid());
+ }
+
+ @Test
+ public void testNodeWithNoAssignment() throws UnknownHostException {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "");
+ properties.put("partitions.other-host", "0, 1, 2, 3");
+
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertTrue(invalidResult.isValid());
+
+ final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+ assertNotNull(partitions);
+ assertEquals(0, partitions.length);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 3e7b16a..f6dda5b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
@@ -313,7 +314,26 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- return KafkaProcessorUtils.validateCommonProperties(validationContext);
+ final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+ final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+ validationResults.add(consumerPartitionsResult);
+
+ final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+ if (explicitPartitionMapping) {
+ final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+ if (TOPIC_PATTERN.getValue().equals(topicType)) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(TOPIC_TYPE.getDisplayName())
+ .input(TOPIC_PATTERN.getDisplayName())
+ .valid(false)
+ .explanation("It is not valid to explicitly assign topic partitions and also using a Topic Pattern. "
+ + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+ .build());
+ }
+ }
+
+ return validationResults;
}
private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -322,7 +342,24 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
return pool;
}
- return consumerPool = createConsumerPool(context, getLogger());
+ final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+ final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+ if (numAssignedPartitions > 0) {
+ // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+ // all of the partitions assigned.
+ final int partitionCount = consumerPool.getPartitionCount();
+ if (partitionCount != numAssignedPartitions) {
+ context.yield();
+ consumerPool.close();
+
+ throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+ " topic(s) have " + partitionCount + " partitions");
+ }
+ }
+
+ this.consumerPool = consumerPool;
+ return consumerPool;
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -355,6 +392,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+ final int[] partitionsToConsume;
+ try {
+ partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+ } catch (final UnknownHostException uhe) {
+ throw new ProcessException("Could not determine localhost's hostname", uhe);
+ }
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -364,11 +408,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 5461abb..5e432b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
@@ -287,7 +288,26 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- return KafkaProcessorUtils.validateCommonProperties(validationContext);
+ final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+ final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+ validationResults.add(consumerPartitionsResult);
+
+ final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+ if (explicitPartitionMapping) {
+ final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+ if (TOPIC_PATTERN.getValue().equals(topicType)) {
+ validationResults.add(new ValidationResult.Builder()
+ .subject(TOPIC_TYPE.getDisplayName())
+ .input(TOPIC_PATTERN.getDisplayName())
+ .valid(false)
+ .explanation("It is not valid to explicitly assign topic partitions and also using a Topic Pattern. "
+ + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+ .build());
+ }
+ }
+
+ return validationResults;
}
private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -296,7 +316,24 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
return pool;
}
- return consumerPool = createConsumerPool(context, getLogger());
+ final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+ final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+ if (numAssignedPartitions > 0) {
+ // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+ // all of the partitions assigned.
+ final int partitionCount = consumerPool.getPartitionCount();
+ if (partitionCount != numAssignedPartitions) {
+ context.yield();
+ consumerPool.close();
+
+ throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+ " topic(s) have " + partitionCount + " partitions");
+ }
+ }
+
+ this.consumerPool = consumerPool;
+ return consumerPool;
}
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -329,6 +366,13 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
+ final int[] partitionsToConsume;
+ try {
+ partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+ } catch (final UnknownHostException uhe) {
+ throw new ProcessException("Could not determine localhost's hostname", uhe);
+ }
+
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
final String trimmedName = topic.trim();
@@ -338,11 +382,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
}
return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
new file mode 100644
index 0000000..bf71e78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerPartitionsUtil {
+ public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";
+
+ public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException {
+ final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties);
+ final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString);
+
+ if (partitionsByHost.isEmpty()) {
+ // Explicit partitioning is not enabled.
+ logger.debug("No explicit Consumer Partitions have been declared.");
+ return null;
+ }
+
+ logger.info("Found the following mapping of hosts to partitions: {}", new Object[] {hostnameToPartitionString});
+
+ // Determine the partitions based on hostname/IP.
+ int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+ if (partitionsForThisHost == null) {
+ throw new IllegalArgumentException("Could not find a partition mapping for host " + InetAddress.getLocalHost().getCanonicalHostName());
+ }
+
+ return partitionsForThisHost;
+ }
+
+ private static Map<String, int[]> mapPartitionValueToIntArrays(final Map<String, String> partitionValues) {
+ final Map<String, int[]> partitionsByHost = new HashMap<>();
+ for (final Map.Entry<String, String> entry : partitionValues.entrySet()) {
+ final String host = entry.getKey();
+ final int[] partitions = parsePartitions(host, entry.getValue());
+ partitionsByHost.put(host, partitions);
+ }
+
+ return partitionsByHost;
+ }
+
+ private static int[] getPartitionsForThisHost(final Map<String, int[]> partitionsByHost) throws UnknownHostException {
+ // Determine the partitions based on hostname/IP.
+ final InetAddress localhost = InetAddress.getLocalHost();
+ int[] partitionsForThisHost = partitionsByHost.get(localhost.getCanonicalHostName());
+ if (partitionsForThisHost != null) {
+ return partitionsForThisHost;
+ }
+
+ partitionsForThisHost = partitionsByHost.get(localhost.getHostName());
+ if (partitionsForThisHost != null) {
+ return partitionsForThisHost;
+ }
+
+ return partitionsByHost.get(localhost.getHostAddress());
+ }
+
+ private static Map<String, String> mapHostnamesToPartitionStrings(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionString = new HashMap<>();
+ for (final Map.Entry<String, String> entry : properties.entrySet()) {
+ final String propertyName = entry.getKey();
+ if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) {
+ continue;
+ }
+
+ if (propertyName.length() <= PARTITION_PROPERTY_NAME_PREFIX.length()) {
+ continue;
+ }
+
+ final String propertyNameAfterPrefix = propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length());
+ hostnameToPartitionString.put(propertyNameAfterPrefix, entry.getValue());
+ }
+
+ return hostnameToPartitionString;
+ }
+
+ private static int[] parsePartitions(final String hostname, final String propertyValue) {
+ final String[] splits = propertyValue.split(",");
+ final List<Integer> partitionList = new ArrayList<>();
+ for (final String split : splits) {
+ if (split.trim().isEmpty()) {
+ continue;
+ }
+
+ try {
+ final int partition = Integer.parseInt(split.trim());
+ if (partition < 0) {
+ throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is negative");
+ }
+
+ partitionList.add(partition);
+ } catch (final NumberFormatException nfe) {
+ throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is not an integer");
+ }
+ }
+
+ // Map out List<Integer> to int[]
+ return partitionList.stream().mapToInt(Integer::intValue).toArray();
+ }
+
+ public static ValidationResult validateConsumePartitions(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+ if (hostnameToPartitionMapping.isEmpty()) {
+ // Partitions are not being explicitly assigned.
+ return new ValidationResult.Builder().valid(true).build();
+ }
+
+ final Set<Integer> partitionsClaimed = new HashSet<>();
+ final Set<Integer> duplicatePartitions = new HashSet<>();
+ for (final Map.Entry<String, String> entry : hostnameToPartitionMapping.entrySet()) {
+ final int[] partitions = parsePartitions(entry.getKey(), entry.getValue());
+ for (final int partition : partitions) {
+ final boolean added = partitionsClaimed.add(partition);
+ if (!added) {
+ duplicatePartitions.add(partition);
+ }
+ }
+ }
+
+ final List<Integer> partitionsMissing = new ArrayList<>();
+ for (int i=0; i < partitionsClaimed.size(); i++) {
+ if (!partitionsClaimed.contains(i)) {
+ partitionsMissing.add(i);
+ }
+ }
+
+ if (!partitionsMissing.isEmpty()) {
+ return new ValidationResult.Builder()
+ .subject("Partitions")
+ .input(partitionsClaimed.toString())
+ .valid(false)
+ .explanation("The following partitions were not mapped to any node: " + partitionsMissing.toString())
+ .build();
+ }
+
+ if (!duplicatePartitions.isEmpty()) {
+ return new ValidationResult.Builder()
+ .subject("Partitions")
+ .input(partitionsClaimed.toString())
+ .valid(false)
+ .explanation("The following partitions were mapped to multiple nodes: " + duplicatePartitions.toString())
+ .build();
+ }
+
+ final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+ final int[] partitionsForThisHost;
+ try {
+ partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+ } catch (UnknownHostException e) {
+ return new ValidationResult.Builder()
+ .valid(false)
+ .subject("Partition Assignment")
+ .explanation("Unable to determine hostname of localhost")
+ .build();
+ }
+
+ if (partitionsForThisHost == null) {
+ return new ValidationResult.Builder()
+ .subject("Partition Assignment")
+ .valid(false)
+ .explanation("No assignment was given for this host")
+ .build();
+ }
+
+ return new ValidationResult.Builder().valid(true).build();
+ }
+
+ public static boolean isPartitionAssignmentExplicit(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+ return !hostnameToPartitionMapping.isEmpty();
+ }
+
+ public static int getPartitionAssignmentCount(final Map<String, String> properties) {
+ final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+ final Map<String, int[]> partitions = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+
+ int count = 0;
+ for (final int[] partitionArray : partitions.values()) {
+ count += partitionArray.length;
+ }
+
+ return count;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 2a33298..46bf97a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -60,6 +62,7 @@ public class ConsumerPool implements Closeable {
private final Charset headerCharacterSet;
private final Pattern headerNamePattern;
private final boolean separateByKey;
+ private final int[] partitionsToConsume;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -97,7 +100,8 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -114,6 +118,7 @@ public class ConsumerPool implements Closeable {
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
+ this.partitionsToConsume = partitionsToConsume;
}
public ConsumerPool(
@@ -129,7 +134,8 @@ public class ConsumerPool implements Closeable {
final ComponentLog logger,
final boolean honorTransactions,
final Charset headerCharacterSet,
- final Pattern headerNamePattern) {
+ final Pattern headerNamePattern,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -146,6 +152,7 @@ public class ConsumerPool implements Closeable {
this.headerCharacterSet = headerCharacterSet;
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
+ this.partitionsToConsume = partitionsToConsume;
}
public ConsumerPool(
@@ -162,7 +169,8 @@ public class ConsumerPool implements Closeable {
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey,
- final String keyEncoding) {
+ final String keyEncoding,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -179,6 +187,7 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
+ this.partitionsToConsume = partitionsToConsume;
}
public ConsumerPool(
@@ -195,7 +204,8 @@ public class ConsumerPool implements Closeable {
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey,
- final String keyEncoding) {
+ final String keyEncoding,
+ final int[] partitionsToConsume) {
this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -212,6 +222,29 @@ public class ConsumerPool implements Closeable {
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.keyEncoding = keyEncoding;
+ this.partitionsToConsume = partitionsToConsume;
+ }
+
+ public int getPartitionCount() {
+ // If using regex for topic names, just return -1
+ if (topics == null || topics.isEmpty()) {
+ return -1;
+ }
+
+ int partitionsEachTopic = 0;
+ try (final Consumer<byte[], byte[]> consumer = createKafkaConsumer()) {
+ for (final String topicName : topics) {
+ final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
+ final int partitionsThisTopic = partitionInfos.size();
+ if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) {
+ throw new IllegalStateException("The specific topic names do not have the same number of partitions");
+ }
+
+ partitionsEachTopic = partitionsThisTopic;
+ }
+ }
+
+ return partitionsEachTopic;
}
/**
@@ -239,14 +272,26 @@ public class ConsumerPool implements Closeable {
* sitting idle which could prompt excessive rebalances.
*/
lease = new SimpleConsumerLease(consumer);
- /**
- * This subscription tightly couples the lease to the given
- * consumer. They cannot be separated from then on.
- */
- if (topics != null) {
- consumer.subscribe(topics, lease);
+
+ if (partitionsToConsume == null) {
+ // This subscription tightly couples the lease to the given
+ // consumer. They cannot be separated from then on.
+ if (topics != null) {
+ consumer.subscribe(topics, lease);
+ } else {
+ consumer.subscribe(topicPattern, lease);
+ }
} else {
- consumer.subscribe(topicPattern, lease);
+ final List<TopicPartition> topicPartitions = new ArrayList<>();
+
+ for (final String topic : topics) {
+ for (final int partition : partitionsToConsume) {
+ final TopicPartition topicPartition = new TopicPartition(topic, partition);
+ topicPartitions.add(topicPartition);
+ }
+ }
+
+ consumer.assign(topicPartitions);
}
}
lease.setProcessSession(session, processContext);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 44a6984..bdddfad 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -50,6 +50,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import java.util.regex.Pattern;
public final class KafkaProcessorUtils {
private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
@@ -62,6 +63,8 @@ public final class KafkaProcessorUtils {
static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute",
"The key will not be added as an Attribute");
+ static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+
static final String KAFKA_KEY = "kafka.key";
static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition";
@@ -353,6 +356,10 @@ public final class KafkaProcessorUtils {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ if (subject.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+ return new ValidationResult.Builder().valid(true).build();
+ }
+
final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
}
@@ -398,7 +405,9 @@ public final class KafkaProcessorUtils {
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
: context.getProperty(propertyDescriptor).getValue();
- if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
+ if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())
+ && !propertyName.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+
// If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
// or the standard NiFi time period such as "5 secs"
if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
index 320d36e..1150e9c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
@@ -29,6 +29,61 @@
written to a FlowFile by serializing the message with the configured Record Writer.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+ assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+ using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+ <code>partitions.<hostname></code> with the value being a comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+ partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+ if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+ started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+ for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+ added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+ do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+ to log errors on startup and will not pull data.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+ Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+ partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+ </p>
+
<h2>Security Configuration:</h2>
<p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
index efb56c0..be2f380 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
@@ -29,6 +29,61 @@
of the Kafka message.
</p>
+ <h2>Consumer Partition Assignment</h2>
+ <p>
+ By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+ assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+ NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+ Node 3 will then be assigned partitions 6 and 7.
+ </p>
+
+ <p>
+ In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+ For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+ where this is undesirable.
+ </p>
+
+ <p>
+ One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+ has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+ 15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+ Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+ it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+ </p>
+
+ <p>
+ The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+ Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+ using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+ are handled.
+ </p>
+
+ <p>
+ In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+ <code>partitions.<hostname></code> with the value being a comma-separated list of Kafka partitions to use. For example,
+ <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+ The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+ the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+ added for the hostname with an empty string as the value.
+ </p>
+
+ <p>
+ NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+ partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+ if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+ started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+ for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+ added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+ do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+ to log errors on startup and will not pull data.
+ </p>
+
+ <p>
+ In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+ Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+ partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+ </p>
+
<h2>Security Configuration</h2>
<p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index d006a6e..18e188c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -24,8 +24,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.junit.Before;
@@ -80,6 +78,7 @@ public class ConsumerPoolTest {
logger,
true,
StandardCharsets.UTF_8,
+ null,
null) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
@@ -99,7 +98,8 @@ public class ConsumerPoolTest {
logger,
true,
StandardCharsets.UTF_8,
- Pattern.compile(".*")) {
+ Pattern.compile(".*"),
+ null) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return consumer;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
new file mode 100644
index 0000000..1053f2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestConsumerPartitionsUtil {
+ private final ComponentLog logger = new MockComponentLogger();
+ private String hostname;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ hostname = InetAddress.getLocalHost().getHostName();;
+ }
+
+ @Test
+ public void testNoPartitionAssignments() throws UnknownHostException {
+ final Map<String, String> properties = Collections.singletonMap("key", "value");
+ final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+ assertNull(partitions);
+ }
+
+ @Test
+ public void testAllPartitionsAssignedToOneHost() throws UnknownHostException {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "0, 1, 2, 3");
+ final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+ assertNotNull(partitions);
+
+ assertArrayEquals(new int[] {0, 1, 2, 3}, partitions);
+ }
+
+ @Test
+ public void testSomePartitionsSkipped() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "0, 1, 2, 3, 5");
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertFalse(invalidResult.isValid());
+
+ properties.put("partitions." + hostname, "0, 1,2,3,4, 5");
+ final ValidationResult validResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(validResult);
+ assertTrue(validResult.isValid());
+ }
+
+ @Test
+ public void testCurrentNodeNotSpecified() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions.other-host", "0, 1, 2, 3");
+
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertFalse(invalidResult.isValid());
+ }
+
+ @Test
+ public void testPartitionListedTwice() {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "2");
+ properties.put("partitions.other-host", "0, 1, 2, 3");
+
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertFalse(invalidResult.isValid());
+ }
+
+ @Test
+ public void testNodeWithNoAssignment() throws UnknownHostException {
+ final Map<String, String> properties = new HashMap<>();
+ properties.put("key", "value");
+ properties.put("partitions." + hostname, "");
+ properties.put("partitions.other-host", "0, 1, 2, 3");
+
+ final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+ assertNotNull(invalidResult);
+ assertTrue(invalidResult.isValid());
+
+ final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+ assertNotNull(partitions);
+ assertEquals(0, partitions.length);
+ }
+
+}