You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2020/11/18 15:03:36 UTC

[nifi] branch main updated: NIFI-8021: Provide the ability to pin partitions to particular hosts when using ConsumeKafka processors

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7848ba5  NIFI-8021: Provide the ability to pin partitions to particular hosts when using ConsumeKafka processors
7848ba5 is described below

commit 7848ba59a250ba4ae98d7062869bcf01a9a58b71
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Nov 17 17:27:39 2020 -0500

    NIFI-8021: Provide the ability to pin partitions to particular hosts when using ConsumeKafka processors
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4672.
---
 .../kafka/pubsub/ConsumeKafkaRecord_2_0.java       |  52 ++++-
 .../processors/kafka/pubsub/ConsumeKafka_2_0.java  |  55 +++++-
 .../kafka/pubsub/ConsumerPartitionsUtil.java       | 209 +++++++++++++++++++++
 .../nifi/processors/kafka/pubsub/ConsumerPool.java |  64 ++++++-
 .../kafka/pubsub/KafkaProcessorUtils.java          |   8 +-
 .../additionalDetails.html                         |  55 ++++++
 .../additionalDetails.html                         |  55 ++++++
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |   4 +-
 .../kafka/pubsub/TestConsumerPartitionsUtil.java   | 120 ++++++++++++
 .../kafka/pubsub/ConsumeKafkaRecord_2_6.java       |  52 ++++-
 .../processors/kafka/pubsub/ConsumeKafka_2_6.java  |  52 ++++-
 .../kafka/pubsub/ConsumerPartitionsUtil.java       | 209 +++++++++++++++++++++
 .../nifi/processors/kafka/pubsub/ConsumerPool.java |  67 +++++--
 .../kafka/pubsub/KafkaProcessorUtils.java          |  11 +-
 .../additionalDetails.html                         |  55 ++++++
 .../additionalDetails.html                         |  55 ++++++
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |   6 +-
 .../kafka/pubsub/TestConsumerPartitionsUtil.java   | 120 ++++++++++++
 18 files changed, 1206 insertions(+), 43 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index 79824b1..f279763 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -44,6 +44,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
+import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -313,7 +314,26 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+        final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+        validationResults.add(consumerPartitionsResult);
+
+        final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+        if (explicitPartitionMapping) {
+            final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+            if (TOPIC_PATTERN.getValue().equals(topicType)) {
+                validationResults.add(new ValidationResult.Builder()
+                    .subject(TOPIC_TYPE.getDisplayName())
+                    .input(TOPIC_PATTERN.getDisplayName())
+                    .valid(false)
+                    .explanation("It is not valid to explicitly assign Topic Partitions and also use a Topic Pattern. "
+                        + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+                    .build());
+            }
+        }
+
+        return validationResults;
     }
 
     private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -322,7 +342,24 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             return pool;
         }
 
-        return consumerPool = createConsumerPool(context, getLogger());
+        final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+        final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+        if (numAssignedPartitions > 0) {
+            // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+            // all of the partitions assigned.
+            final int partitionCount = consumerPool.getPartitionCount();
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+                consumerPool.close();
+
+                throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+                    " topic(s) have " + partitionCount + " partitions");
+            }
+        }
+
+        this.consumerPool = consumerPool;
+        return consumerPool;
     }
 
     protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -355,6 +392,13 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
 
+        final int[] partitionsToConsume;
+        try {
+            partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+        } catch (final UnknownHostException uhe) {
+            throw new ProcessException("Could not determine localhost's hostname", uhe);
+        }
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -364,11 +408,11 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
             return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index fc00693..a25f0b4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -77,7 +78,6 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 public class ConsumeKafka_2_0 extends AbstractProcessor {
-
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
@@ -286,7 +286,26 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+        final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+        validationResults.add(consumerPartitionsResult);
+
+        final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+        if (explicitPartitionMapping) {
+            final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+            if (TOPIC_PATTERN.getValue().equals(topicType)) {
+                validationResults.add(new ValidationResult.Builder()
+                    .subject(TOPIC_TYPE.getDisplayName())
+                    .input(TOPIC_PATTERN.getDisplayName())
+                    .valid(false)
+                    .explanation("It is not valid to explicitly assign Topic Partitions and also use a Topic Pattern. "
+                        + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+                    .build());
+            }
+        }
+
+        return validationResults;
     }
 
     private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -295,7 +314,24 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
             return pool;
         }
 
-        return consumerPool = createConsumerPool(context, getLogger());
+        final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+        final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+        if (numAssignedPartitions > 0) {
+            // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+            // all of the partitions assigned.
+            final int partitionCount = consumerPool.getPartitionCount();
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+                consumerPool.close();
+
+                throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+                    " topic(s) have " + partitionCount + " partitions");
+            }
+        }
+
+        this.consumerPool = consumerPool;
+        return consumerPool;
     }
 
     protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -328,6 +364,13 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
 
         final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
 
+        final int[] partitionsToConsume;
+        try {
+            partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+        } catch (final UnknownHostException uhe) {
+            throw new ProcessException("Could not determine localhost's hostname", uhe);
+        }
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -337,11 +380,11 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
             }
 
             return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
             return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
             return null;
@@ -379,6 +422,8 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         activeLeases.clear();
     }
 
+
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
         final ConsumerPool pool = getConsumerPool(context);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
new file mode 100644
index 0000000..bf71e78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerPartitionsUtil {
+    public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";
+
+    public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException {
+        final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties);
+        final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString);
+
+        if (partitionsByHost.isEmpty()) {
+            // Explicit partitioning is not enabled.
+            logger.debug("No explicit Consumer Partitions have been declared.");
+            return null;
+        }
+
+        logger.info("Found the following mapping of hosts to partitions: {}", new Object[] {hostnameToPartitionString});
+
+        // Determine the partitions based on hostname/IP.
+        int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+        if (partitionsForThisHost == null) {
+            throw new IllegalArgumentException("Could not find a partition mapping for host " + InetAddress.getLocalHost().getCanonicalHostName());
+        }
+
+        return partitionsForThisHost;
+    }
+
+    private static Map<String, int[]> mapPartitionValueToIntArrays(final Map<String, String> partitionValues) {
+        final Map<String, int[]> partitionsByHost = new HashMap<>();
+        for (final Map.Entry<String, String> entry : partitionValues.entrySet()) {
+            final String host = entry.getKey();
+            final int[] partitions = parsePartitions(host, entry.getValue());
+            partitionsByHost.put(host, partitions);
+        }
+
+        return partitionsByHost;
+    }
+
+    private static int[] getPartitionsForThisHost(final Map<String, int[]> partitionsByHost) throws UnknownHostException {
+        // Determine the partitions based on hostname/IP.
+        final InetAddress localhost = InetAddress.getLocalHost();
+        int[] partitionsForThisHost = partitionsByHost.get(localhost.getCanonicalHostName());
+        if (partitionsForThisHost != null) {
+            return partitionsForThisHost;
+        }
+
+        partitionsForThisHost = partitionsByHost.get(localhost.getHostName());
+        if (partitionsForThisHost != null) {
+            return partitionsForThisHost;
+        }
+
+        return partitionsByHost.get(localhost.getHostAddress());
+    }
+
+    private static Map<String, String> mapHostnamesToPartitionStrings(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionString = new HashMap<>();
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String propertyName = entry.getKey();
+            if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) {
+                continue;
+            }
+
+            if (propertyName.length() <= PARTITION_PROPERTY_NAME_PREFIX.length()) {
+                continue;
+            }
+
+            final String propertyNameAfterPrefix = propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length());
+            hostnameToPartitionString.put(propertyNameAfterPrefix, entry.getValue());
+        }
+
+        return hostnameToPartitionString;
+    }
+
+    private static int[] parsePartitions(final String hostname, final String propertyValue) {
+        final String[] splits = propertyValue.split(",");
+        final List<Integer> partitionList = new ArrayList<>();
+        for (final String split : splits) {
+            if (split.trim().isEmpty()) {
+                continue;
+            }
+
+            try {
+                final int partition = Integer.parseInt(split.trim());
+                if (partition < 0) {
+                    throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is negative");
+                }
+
+                partitionList.add(partition);
+            } catch (final NumberFormatException nfe) {
+                throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is not an integer");
+            }
+        }
+
+        // Map out List<Integer> to int[]
+        return partitionList.stream().mapToInt(Integer::intValue).toArray();
+    }
+
+    public static ValidationResult validateConsumePartitions(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+        if (hostnameToPartitionMapping.isEmpty()) {
+            // Partitions are not being explicitly assigned.
+            return new ValidationResult.Builder().valid(true).build();
+        }
+
+        final Set<Integer> partitionsClaimed = new HashSet<>();
+        final Set<Integer> duplicatePartitions = new HashSet<>();
+        for (final Map.Entry<String, String> entry : hostnameToPartitionMapping.entrySet()) {
+            final int[] partitions = parsePartitions(entry.getKey(), entry.getValue());
+            for (final int partition : partitions) {
+                final boolean added = partitionsClaimed.add(partition);
+                if (!added) {
+                    duplicatePartitions.add(partition);
+                }
+            }
+        }
+
+        final List<Integer> partitionsMissing = new ArrayList<>();
+        for (int i=0; i < partitionsClaimed.size(); i++) {
+            if (!partitionsClaimed.contains(i)) {
+                partitionsMissing.add(i);
+            }
+        }
+
+        if (!partitionsMissing.isEmpty()) {
+            return new ValidationResult.Builder()
+                .subject("Partitions")
+                .input(partitionsClaimed.toString())
+                .valid(false)
+                .explanation("The following partitions were not mapped to any node: " + partitionsMissing.toString())
+                .build();
+        }
+
+        if (!duplicatePartitions.isEmpty()) {
+            return new ValidationResult.Builder()
+                .subject("Partitions")
+                .input(partitionsClaimed.toString())
+                .valid(false)
+                .explanation("The following partitions were mapped to multiple nodes: " + duplicatePartitions.toString())
+                .build();
+        }
+
+        final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+        final int[] partitionsForThisHost;
+        try {
+            partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+        } catch (UnknownHostException e) {
+            return new ValidationResult.Builder()
+                .valid(false)
+                .subject("Partition Assignment")
+                .explanation("Unable to determine hostname of localhost")
+                .build();
+        }
+
+        if (partitionsForThisHost == null) {
+            return new ValidationResult.Builder()
+                .subject("Partition Assignment")
+                .valid(false)
+                .explanation("No assignment was given for this host")
+                .build();
+        }
+
+        return new ValidationResult.Builder().valid(true).build();
+    }
+
+    public static boolean isPartitionAssignmentExplicit(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+        return !hostnameToPartitionMapping.isEmpty();
+    }
+
+    public static int getPartitionAssignmentCount(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+        final Map<String, int[]> partitions = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+
+        int count = 0;
+        for (final int[] partitionArray : partitions.values()) {
+            count += partitionArray.length;
+        }
+
+        return count;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 0462729..99293c6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.kafka.pubsub;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -60,6 +62,7 @@ public class ConsumerPool implements Closeable {
     private final Charset headerCharacterSet;
     private final Pattern headerNamePattern;
     private final boolean separateByKey;
+    private final int[] partitionsToConsume;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -97,7 +100,8 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -114,6 +118,7 @@ public class ConsumerPool implements Closeable {
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
+        this.partitionsToConsume = partitionsToConsume;
     }
 
     public ConsumerPool(
@@ -129,7 +134,8 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -146,6 +152,7 @@ public class ConsumerPool implements Closeable {
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
+        this.partitionsToConsume = partitionsToConsume;
     }
 
     public ConsumerPool(
@@ -162,7 +169,8 @@ public class ConsumerPool implements Closeable {
             final Charset headerCharacterSet,
             final Pattern headerNamePattern,
             final boolean separateByKey,
-            final String keyEncoding) {
+            final String keyEncoding,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -179,6 +187,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
+        this.partitionsToConsume = partitionsToConsume;
     }
 
     public ConsumerPool(
@@ -195,7 +204,8 @@ public class ConsumerPool implements Closeable {
             final Charset headerCharacterSet,
             final Pattern headerNamePattern,
             final boolean separateByKey,
-            final String keyEncoding) {
+            final String keyEncoding,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -212,6 +222,29 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
+        this.partitionsToConsume = partitionsToConsume;
+    }
+
+    public int getPartitionCount() {
+        // If using regex for topic names, just return -1
+        if (topics == null || topics.isEmpty()) {
+            return -1;
+        }
+
+        int partitionsEachTopic = 0;
+        try (final Consumer<byte[], byte[]> consumer = createKafkaConsumer()) {
+            for (final String topicName : topics) {
+                final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
+                final int partitionsThisTopic = partitionInfos.size();
+                if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) {
+                    throw new IllegalStateException("The specific topic names do not have the same number of partitions");
+                }
+
+                partitionsEachTopic = partitionsThisTopic;
+            }
+        }
+
+        return partitionsEachTopic;
     }
 
     /**
@@ -241,12 +274,25 @@ public class ConsumerPool implements Closeable {
              */
             lease = new SimpleConsumerLease(consumer);
 
-            // This subscription tightly couples the lease to the given
-            // consumer. They cannot be separated from then on.
-            if (topics != null) {
-              consumer.subscribe(topics, lease);
+            if (partitionsToConsume == null) {
+                // This subscription tightly couples the lease to the given
+                // consumer. They cannot be separated from then on.
+                if (topics != null) {
+                    consumer.subscribe(topics, lease);
+                } else {
+                    consumer.subscribe(topicPattern, lease);
+                }
             } else {
-              consumer.subscribe(topicPattern, lease);
+                final List<TopicPartition> topicPartitions = new ArrayList<>();
+
+                for (final String topic : topics) {
+                    for (final int partition : partitionsToConsume) {
+                        final TopicPartition topicPartition = new TopicPartition(topic, partition);
+                        topicPartitions.add(topicPartition);
+                    }
+                }
+
+                consumer.assign(topicPartitions);
             }
         }
         lease.setProcessSession(session, processContext);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index b089fce..bdddfad 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -356,6 +356,10 @@ public final class KafkaProcessorUtils {
 
         @Override
         public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (subject.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+                return new ValidationResult.Builder().valid(true).build();
+            }
+
             final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
             return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
         }
@@ -401,7 +405,9 @@ public final class KafkaProcessorUtils {
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
                     : context.getProperty(propertyDescriptor).getValue();
 
-            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
+            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())
+                && !propertyName.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+
                 // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
                 // or the standard NiFi time period such as "5 secs"
                 if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
index 8f7c4e9..ec2e266 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0/additionalDetails.html
@@ -29,6 +29,61 @@
             written to a FlowFile by serializing the message with the configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+            partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+            if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+            started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+            for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+            added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+            do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+            to log errors on startup and will not pull data.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+            Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+            partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+        </p>
+
 
         <h2>Security Configuration:</h2>
         <p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
index bd061b2..e7cf092 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0/additionalDetails.html
@@ -29,6 +29,61 @@
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+            partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+            if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+            started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+            for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+            added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+            do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+            to log errors on startup and will not pull data.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+            Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+            partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+        </p>
+
 
         <h2>Security Configuration</h2>
         <p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index d006a6e..3414420 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -80,6 +80,7 @@ public class ConsumerPoolTest {
                 logger,
                 true,
                 StandardCharsets.UTF_8,
+                null,
                 null) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
@@ -99,7 +100,8 @@ public class ConsumerPoolTest {
                 logger,
                 true,
                 StandardCharsets.UTF_8,
-                Pattern.compile(".*")) {
+                Pattern.compile(".*"),
+                null) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
new file mode 100644
index 0000000..1053f2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestConsumerPartitionsUtil {
+    private final ComponentLog logger = new MockComponentLogger();
+    private String hostname;
+
+    @Before
+    public void setup() throws UnknownHostException {
+        hostname = InetAddress.getLocalHost().getHostName();;
+    }
+
+    @Test
+    public void testNoPartitionAssignments() throws UnknownHostException {
+        final Map<String, String> properties = Collections.singletonMap("key", "value");
+        final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+        assertNull(partitions);
+    }
+
+    @Test
+    public void testAllPartitionsAssignedToOneHost() throws UnknownHostException {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "0, 1, 2, 3");
+        final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+        assertNotNull(partitions);
+
+        assertArrayEquals(new int[] {0, 1, 2, 3}, partitions);
+    }
+
+    @Test
+    public void testSomePartitionsSkipped() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "0, 1, 2, 3, 5");
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertFalse(invalidResult.isValid());
+
+        properties.put("partitions." + hostname, "0, 1,2,3,4, 5");
+        final ValidationResult validResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(validResult);
+        assertTrue(validResult.isValid());
+    }
+
+    @Test
+    public void testCurrentNodeNotSpecified() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions.other-host", "0, 1, 2, 3");
+
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertFalse(invalidResult.isValid());
+    }
+
+    @Test
+    public void testPartitionListedTwice() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "2");
+        properties.put("partitions.other-host", "0, 1, 2, 3");
+
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertFalse(invalidResult.isValid());
+    }
+
+    @Test
+    public void testNodeWithNoAssignment() throws UnknownHostException {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "");
+        properties.put("partitions.other-host", "0, 1, 2, 3");
+
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertTrue(invalidResult.isValid());
+
+        final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+        assertNotNull(partitions);
+        assertEquals(0, partitions.length);
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 3e7b16a..f6dda5b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 
+import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -313,7 +314,26 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+        final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+        validationResults.add(consumerPartitionsResult);
+
+        final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+        if (explicitPartitionMapping) {
+            final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+            if (TOPIC_PATTERN.getValue().equals(topicType)) {
+                validationResults.add(new ValidationResult.Builder()
+                    .subject(TOPIC_TYPE.getDisplayName())
+                    .input(TOPIC_PATTERN.getDisplayName())
+                    .valid(false)
+                    .explanation("It is not valid to explicitly assign topic partitions and also using a Topic Pattern. "
+                        + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+                    .build());
+            }
+        }
+
+        return validationResults;
     }
 
     private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -322,7 +342,24 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
             return pool;
         }
 
-        return consumerPool = createConsumerPool(context, getLogger());
+        final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+        final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+        if (numAssignedPartitions > 0) {
+            // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+            // all of the partitions assigned.
+            final int partitionCount = consumerPool.getPartitionCount();
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+                consumerPool.close();
+
+                throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+                    " topic(s) have " + partitionCount + " partitions");
+            }
+        }
+
+        this.consumerPool = consumerPool;
+        return consumerPool;
     }
 
     protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -355,6 +392,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
         final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
 
+        final int[] partitionsToConsume;
+        try {
+            partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+        } catch (final UnknownHostException uhe) {
+            throw new ProcessException("Could not determine localhost's hostname", uhe);
+        }
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -364,11 +408,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor {
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
             return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
index 5461abb..5e432b3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java
@@ -42,6 +42,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
@@ -287,7 +288,26 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+
+        final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
+        validationResults.add(consumerPartitionsResult);
+
+        final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
+        if (explicitPartitionMapping) {
+            final String topicType = validationContext.getProperty(TOPIC_TYPE).getValue();
+            if (TOPIC_PATTERN.getValue().equals(topicType)) {
+                validationResults.add(new ValidationResult.Builder()
+                    .subject(TOPIC_TYPE.getDisplayName())
+                    .input(TOPIC_PATTERN.getDisplayName())
+                    .valid(false)
+                    .explanation("It is not valid to explicitly assign topic partitions and also using a Topic Pattern. "
+                        + "Topic Partitions may be assigned only if explicitly specifying topic names also.")
+                    .build());
+            }
+        }
+
+        return validationResults;
     }
 
     private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -296,7 +316,24 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
             return pool;
         }
 
-        return consumerPool = createConsumerPool(context, getLogger());
+        final ConsumerPool consumerPool = createConsumerPool(context, getLogger());
+
+        final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
+        if (numAssignedPartitions > 0) {
+            // Request from Kafka the number of partitions for the topics that we are consuming from. Then ensure that we have
+            // all of the partitions assigned.
+            final int partitionCount = consumerPool.getPartitionCount();
+            if (partitionCount != numAssignedPartitions) {
+                context.yield();
+                consumerPool.close();
+
+                throw new ProcessException("Illegal Partition Assignment: There are " + numAssignedPartitions + " partitions statically assigned using the partitions.* property names, but the Kafka" +
+                    " topic(s) have " + partitionCount + " partitions");
+            }
+        }
+
+        this.consumerPool = consumerPool;
+        return consumerPool;
     }
 
     protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
@@ -329,6 +366,13 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
 
         final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
 
+        final int[] partitionsToConsume;
+        try {
+            partitionsToConsume = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
+        } catch (final UnknownHostException uhe) {
+            throw new ProcessException("Could not determine localhost's hostname", uhe);
+        }
+
         if (topicType.equals(TOPIC_NAME.getValue())) {
             for (final String topic : topicListing.split(",", 100)) {
                 final String trimmedName = topic.trim();
@@ -338,11 +382,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor {
             }
 
             return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
         } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
             final Pattern topicPattern = Pattern.compile(topicListing.trim());
             return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
-                bootstrapServers, log, honorTransactions, charset, headerNamePattern);
+                bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
         } else {
             getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
             return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
new file mode 100644
index 0000000..bf71e78
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPartitionsUtil.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ConsumerPartitionsUtil {
+    public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";
+
+    public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException {
+        final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties);
+        final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString);
+
+        if (partitionsByHost.isEmpty()) {
+            // Explicit partitioning is not enabled.
+            logger.debug("No explicit Consumer Partitions have been declared.");
+            return null;
+        }
+
+        logger.info("Found the following mapping of hosts to partitions: {}", new Object[] {hostnameToPartitionString});
+
+        // Determine the partitions based on hostname/IP.
+        int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+        if (partitionsForThisHost == null) {
+            throw new IllegalArgumentException("Could not find a partition mapping for host " + InetAddress.getLocalHost().getCanonicalHostName());
+        }
+
+        return partitionsForThisHost;
+    }
+
+    private static Map<String, int[]> mapPartitionValueToIntArrays(final Map<String, String> partitionValues) {
+        final Map<String, int[]> partitionsByHost = new HashMap<>();
+        for (final Map.Entry<String, String> entry : partitionValues.entrySet()) {
+            final String host = entry.getKey();
+            final int[] partitions = parsePartitions(host, entry.getValue());
+            partitionsByHost.put(host, partitions);
+        }
+
+        return partitionsByHost;
+    }
+
+    private static int[] getPartitionsForThisHost(final Map<String, int[]> partitionsByHost) throws UnknownHostException {
+        // Determine the partitions based on hostname/IP.
+        final InetAddress localhost = InetAddress.getLocalHost();
+        int[] partitionsForThisHost = partitionsByHost.get(localhost.getCanonicalHostName());
+        if (partitionsForThisHost != null) {
+            return partitionsForThisHost;
+        }
+
+        partitionsForThisHost = partitionsByHost.get(localhost.getHostName());
+        if (partitionsForThisHost != null) {
+            return partitionsForThisHost;
+        }
+
+        return partitionsByHost.get(localhost.getHostAddress());
+    }
+
+    private static Map<String, String> mapHostnamesToPartitionStrings(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionString = new HashMap<>();
+        for (final Map.Entry<String, String> entry : properties.entrySet()) {
+            final String propertyName = entry.getKey();
+            if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) {
+                continue;
+            }
+
+            if (propertyName.length() <= PARTITION_PROPERTY_NAME_PREFIX.length()) {
+                continue;
+            }
+
+            final String propertyNameAfterPrefix = propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length());
+            hostnameToPartitionString.put(propertyNameAfterPrefix, entry.getValue());
+        }
+
+        return hostnameToPartitionString;
+    }
+
+    private static int[] parsePartitions(final String hostname, final String propertyValue) {
+        final String[] splits = propertyValue.split(",");
+        final List<Integer> partitionList = new ArrayList<>();
+        for (final String split : splits) {
+            if (split.trim().isEmpty()) {
+                continue;
+            }
+
+            try {
+                final int partition = Integer.parseInt(split.trim());
+                if (partition < 0) {
+                    throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is negative");
+                }
+
+                partitionList.add(partition);
+            } catch (final NumberFormatException nfe) {
+                throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is not an integer");
+            }
+        }
+
+        // Map out List<Integer> to int[]
+        return partitionList.stream().mapToInt(Integer::intValue).toArray();
+    }
+
+    public static ValidationResult validateConsumePartitions(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+        if (hostnameToPartitionMapping.isEmpty()) {
+            // Partitions are not being explicitly assigned.
+            return new ValidationResult.Builder().valid(true).build();
+        }
+
+        final Set<Integer> partitionsClaimed = new HashSet<>();
+        final Set<Integer> duplicatePartitions = new HashSet<>();
+        for (final Map.Entry<String, String> entry : hostnameToPartitionMapping.entrySet()) {
+            final int[] partitions = parsePartitions(entry.getKey(), entry.getValue());
+            for (final int partition : partitions) {
+                final boolean added = partitionsClaimed.add(partition);
+                if (!added) {
+                    duplicatePartitions.add(partition);
+                }
+            }
+        }
+
+        final List<Integer> partitionsMissing = new ArrayList<>();
+        for (int i=0; i < partitionsClaimed.size(); i++) {
+            if (!partitionsClaimed.contains(i)) {
+                partitionsMissing.add(i);
+            }
+        }
+
+        if (!partitionsMissing.isEmpty()) {
+            return new ValidationResult.Builder()
+                .subject("Partitions")
+                .input(partitionsClaimed.toString())
+                .valid(false)
+                .explanation("The following partitions were not mapped to any node: " + partitionsMissing.toString())
+                .build();
+        }
+
+        if (!duplicatePartitions.isEmpty()) {
+            return new ValidationResult.Builder()
+                .subject("Partitions")
+                .input(partitionsClaimed.toString())
+                .valid(false)
+                .explanation("The following partitions were mapped to multiple nodes: " + duplicatePartitions.toString())
+                .build();
+        }
+
+        final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+        final int[] partitionsForThisHost;
+        try {
+            partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
+        } catch (UnknownHostException e) {
+            return new ValidationResult.Builder()
+                .valid(false)
+                .subject("Partition Assignment")
+                .explanation("Unable to determine hostname of localhost")
+                .build();
+        }
+
+        if (partitionsForThisHost == null) {
+            return new ValidationResult.Builder()
+                .subject("Partition Assignment")
+                .valid(false)
+                .explanation("No assignment was given for this host")
+                .build();
+        }
+
+        return new ValidationResult.Builder().valid(true).build();
+    }
+
+    public static boolean isPartitionAssignmentExplicit(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+        return !hostnameToPartitionMapping.isEmpty();
+    }
+
+    public static int getPartitionAssignmentCount(final Map<String, String> properties) {
+        final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
+        final Map<String, int[]> partitions = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
+
+        int count = 0;
+        for (final int[] partitionArray : partitions.values()) {
+            count += partitionArray.length;
+        }
+
+        return count;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 2a33298..46bf97a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -19,6 +19,8 @@ package org.apache.nifi.processors.kafka.pubsub;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -60,6 +62,7 @@ public class ConsumerPool implements Closeable {
     private final Charset headerCharacterSet;
     private final Pattern headerNamePattern;
     private final boolean separateByKey;
+    private final int[] partitionsToConsume;
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -97,7 +100,8 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -114,6 +118,7 @@ public class ConsumerPool implements Closeable {
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
+        this.partitionsToConsume = partitionsToConsume;
     }
 
     public ConsumerPool(
@@ -129,7 +134,8 @@ public class ConsumerPool implements Closeable {
             final ComponentLog logger,
             final boolean honorTransactions,
             final Charset headerCharacterSet,
-            final Pattern headerNamePattern) {
+            final Pattern headerNamePattern,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -146,6 +152,7 @@ public class ConsumerPool implements Closeable {
         this.headerCharacterSet = headerCharacterSet;
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
+        this.partitionsToConsume = partitionsToConsume;
     }
 
     public ConsumerPool(
@@ -162,7 +169,8 @@ public class ConsumerPool implements Closeable {
             final Charset headerCharacterSet,
             final Pattern headerNamePattern,
             final boolean separateByKey,
-            final String keyEncoding) {
+            final String keyEncoding,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -179,6 +187,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
+        this.partitionsToConsume = partitionsToConsume;
     }
 
     public ConsumerPool(
@@ -195,7 +204,8 @@ public class ConsumerPool implements Closeable {
             final Charset headerCharacterSet,
             final Pattern headerNamePattern,
             final boolean separateByKey,
-            final String keyEncoding) {
+            final String keyEncoding,
+            final int[] partitionsToConsume) {
         this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
         this.maxWaitMillis = maxWaitMillis;
         this.logger = logger;
@@ -212,6 +222,29 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
+        this.partitionsToConsume = partitionsToConsume;
+    }
+
+    public int getPartitionCount() {
+        // If using regex for topic names, just return -1
+        if (topics == null || topics.isEmpty()) {
+            return -1;
+        }
+
+        int partitionsEachTopic = 0;
+        try (final Consumer<byte[], byte[]> consumer = createKafkaConsumer()) {
+            for (final String topicName : topics) {
+                final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);
+                final int partitionsThisTopic = partitionInfos.size();
+                if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) {
+                    throw new IllegalStateException("The specific topic names do not have the same number of partitions");
+                }
+
+                partitionsEachTopic = partitionsThisTopic;
+            }
+        }
+
+        return partitionsEachTopic;
     }
 
     /**
@@ -239,14 +272,26 @@ public class ConsumerPool implements Closeable {
              * sitting idle which could prompt excessive rebalances.
              */
             lease = new SimpleConsumerLease(consumer);
-            /**
-             * This subscription tightly couples the lease to the given
-             * consumer. They cannot be separated from then on.
-             */
-            if (topics != null) {
-              consumer.subscribe(topics, lease);
+
+            if (partitionsToConsume == null) {
+                // This subscription tightly couples the lease to the given
+                // consumer. They cannot be separated from then on.
+                if (topics != null) {
+                    consumer.subscribe(topics, lease);
+                } else {
+                    consumer.subscribe(topicPattern, lease);
+                }
             } else {
-              consumer.subscribe(topicPattern, lease);
+                final List<TopicPartition> topicPartitions = new ArrayList<>();
+
+                for (final String topic : topics) {
+                    for (final int partition : partitionsToConsume) {
+                        final TopicPartition topicPartition = new TopicPartition(topic, partition);
+                        topicPartitions.add(topicPartition);
+                    }
+                }
+
+                consumer.assign(topicPartitions);
             }
         }
         lease.setProcessSession(session, processContext);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 44a6984..bdddfad 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -50,6 +50,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.regex.Pattern;
 
 public final class KafkaProcessorUtils {
     private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
@@ -62,6 +63,8 @@ public final class KafkaProcessorUtils {
     static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute",
         "The key will not be added as an Attribute");
 
+    static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+
     static final String KAFKA_KEY = "kafka.key";
     static final String KAFKA_TOPIC = "kafka.topic";
     static final String KAFKA_PARTITION = "kafka.partition";
@@ -353,6 +356,10 @@ public final class KafkaProcessorUtils {
 
         @Override
         public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            if (subject.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+                return new ValidationResult.Builder().valid(true).build();
+            }
+
             final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
             return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
         }
@@ -398,7 +405,9 @@ public final class KafkaProcessorUtils {
                     ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
                     : context.getProperty(propertyDescriptor).getValue();
 
-            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
+            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())
+                && !propertyName.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
+
                 // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
                 // or the standard NiFi time period such as "5 secs"
                 if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
index 320d36e..1150e9c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
@@ -29,6 +29,61 @@
             written to a FlowFile by serializing the message with the configured Record Writer.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+            partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+            if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+            started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+            for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+            added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+            do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+            to log errors on startup and will not pull data.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+            Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+            partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+        </p>
+
 
         <h2>Security Configuration:</h2>
         <p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
index efb56c0..be2f380 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6/additionalDetails.html
@@ -29,6 +29,61 @@
             of the Kafka message.
         </p>
 
+        <h2>Consumer Partition Assignment</h2>
+        <p>
+            By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
+            assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
+            NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
+            Node 3 will then be assigned partitions 6 and 7.
+        </p>
+
+        <p>
+            In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
+            For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
+            where this is undesirable.
+        </p>
+
+        <p>
+            One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
+            has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
+            15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
+            Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
+            it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.
+        </p>
+
+        <p>
+            The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
+            Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
+            using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
+            are handled.
+        </p>
+
+        <p>
+            In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
+            <code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
+            <code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
+            The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
+            the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
+            added for the hostname with an empty string as the value.
+        </p>
+
+        <p>
+            NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
+            partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
+            if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
+            started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
+            for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
+            added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
+            do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
+            to log errors on startup and will not pull data.
+        </p>
+
+        <p>
+            In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
+            Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
+            partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
+        </p>
+
 
         <h2>Security Configuration</h2>
         <p>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index d006a6e..18e188c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -24,8 +24,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerLease;
-import org.apache.nifi.processors.kafka.pubsub.ConsumerPool;
 import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.junit.Before;
@@ -80,6 +78,7 @@ public class ConsumerPoolTest {
                 logger,
                 true,
                 StandardCharsets.UTF_8,
+                null,
                 null) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
@@ -99,7 +98,8 @@ public class ConsumerPoolTest {
                 logger,
                 true,
                 StandardCharsets.UTF_8,
-                Pattern.compile(".*")) {
+                Pattern.compile(".*"),
+                null) {
             @Override
             protected Consumer<byte[], byte[]> createKafkaConsumer() {
                 return consumer;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
new file mode 100644
index 0000000..1053f2b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.mock.MockComponentLogger;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestConsumerPartitionsUtil {
+    private final ComponentLog logger = new MockComponentLogger();
+    private String hostname;
+
+    @Before
+    public void setup() throws UnknownHostException {
+        hostname = InetAddress.getLocalHost().getHostName();;
+    }
+
+    @Test
+    public void testNoPartitionAssignments() throws UnknownHostException {
+        final Map<String, String> properties = Collections.singletonMap("key", "value");
+        final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+        assertNull(partitions);
+    }
+
+    @Test
+    public void testAllPartitionsAssignedToOneHost() throws UnknownHostException {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "0, 1, 2, 3");
+        final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+        assertNotNull(partitions);
+
+        assertArrayEquals(new int[] {0, 1, 2, 3}, partitions);
+    }
+
+    @Test
+    public void testSomePartitionsSkipped() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "0, 1, 2, 3, 5");
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertFalse(invalidResult.isValid());
+
+        properties.put("partitions." + hostname, "0, 1,2,3,4, 5");
+        final ValidationResult validResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(validResult);
+        assertTrue(validResult.isValid());
+    }
+
+    @Test
+    public void testCurrentNodeNotSpecified() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions.other-host", "0, 1, 2, 3");
+
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertFalse(invalidResult.isValid());
+    }
+
+    @Test
+    public void testPartitionListedTwice() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "2");
+        properties.put("partitions.other-host", "0, 1, 2, 3");
+
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertFalse(invalidResult.isValid());
+    }
+
+    @Test
+    public void testNodeWithNoAssignment() throws UnknownHostException {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("key", "value");
+        properties.put("partitions." + hostname, "");
+        properties.put("partitions.other-host", "0, 1, 2, 3");
+
+        final ValidationResult invalidResult = ConsumerPartitionsUtil.validateConsumePartitions(properties);
+        assertNotNull(invalidResult);
+        assertTrue(invalidResult.isValid());
+
+        final int[] partitions = ConsumerPartitionsUtil.getPartitionsForHost(properties, logger);
+        assertNotNull(partitions);
+        assertEquals(0, partitions.length);
+    }
+
+}