You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/03/24 14:00:01 UTC

[nifi] branch main updated (057b4af -> 74ea384)

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 057b4af  NIFI-8296 - Use API to retrieve all subjects associated with a schema id for Confluent Schema Registry v5.4.0+
     new 2f08d1f  NIFI-8357: Updated Kafka 2.0 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions
     new 74ea384  NIFI-8357: Updated Kafka 2.6 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../processors/kafka/pubsub/ConsumerLease.java     |   4 +
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 124 ++++++++++++++-------
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  86 ++++++++++++++
 .../processors/kafka/pubsub/ConsumerLease.java     |   4 +
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 124 ++++++++++++++-------
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  85 ++++++++++++++
 6 files changed, 350 insertions(+), 77 deletions(-)

[nifi] 02/02: NIFI-8357: Updated Kafka 2.6 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions

Posted by tu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 74ea3840ac98c8deff1ab83f673cc8fcb7072bcd
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Mar 23 13:53:26 2021 -0400

    NIFI-8357: Updated Kafka 2.6 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions
    
    This closes #4926.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../processors/kafka/pubsub/ConsumerLease.java     |   4 +
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 124 ++++++++++++++-------
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  85 ++++++++++++++
 3 files changed, 175 insertions(+), 38 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index e3e6124..4ba7c8b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -164,6 +164,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
     }
 
+    public List<TopicPartition> getAssignedPartitions() {
+        return null;
+    }
+
     /**
      * Executes a poll on the underlying Kafka Consumer and creates any new
      * flowfiles necessary or appends to existing ones if in demarcation mode.
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 591480a..0895733 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ public class ConsumerPool implements Closeable {
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+    private final Queue<List<TopicPartition>> availableTopicPartitions = new LinkedBlockingQueue<>();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
@@ -119,7 +121,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -154,7 +156,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -190,7 +192,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -226,7 +228,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public int getPartitionCount() {
@@ -262,66 +264,97 @@ public class ConsumerPool implements Closeable {
      * @return consumer to use or null if not available or necessary
      */
     public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
+        // If there are any partition assignments that do not have leases in our pool, create the leases and add them to the pool.
+        // This is not necessary for us to handle if using automatic subscriptions because the Kafka protocol will ensure that each consumer
+        // has the appropriate partitions. However, if we are using explicit assignment, it's important to create these leases and add them
+        // to our pool in order to avoid starvation. E.g., if we have only a single concurrent task and 5 partitions assigned, we cannot simply
+        // wait until pooledLeases.poll() returns null to create a new ConsumerLease, as doing so may result in constantly pulling from only a
+        // single partition (since we'd get a Lease for Partition 1, then use it, and put it back in the pool).
+        recreateAssignedConsumers();
+
         SimpleConsumerLease lease = pooledLeases.poll();
         if (lease == null) {
-            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
-            consumerCreatedCountRef.incrementAndGet();
-            /**
-             * For now return a new consumer lease. But we could later elect to
-             * have this return null if we determine the broker indicates that
-             * the lag time on all topics being monitored is sufficiently low.
-             * For now we should encourage conservative use of threads because
-             * having too many means we'll have at best useless threads sitting
-             * around doing frequent network calls and at worst having consumers
-             * sitting idle which could prompt excessive rebalances.
-             */
-            lease = new SimpleConsumerLease(consumer);
-
-            if (partitionsToConsume == null) {
-                // This subscription tightly couples the lease to the given
-                // consumer. They cannot be separated from then on.
-                if (topics != null) {
-                    consumer.subscribe(topics, lease);
-                } else {
-                    consumer.subscribe(topicPattern, lease);
-                }
-            } else {
-                logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
+            lease = createConsumerLease();
+            if (lease == null) {
                 return null;
             }
         }
+
         lease.setProcessSession(session, processContext);
 
         leasesObtainedCountRef.incrementAndGet();
         return lease;
     }
 
-    private SimpleConsumerLease createConsumerLease(final int partition) {
-        final List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (final String topic : topics) {
-            final TopicPartition topicPartition = new TopicPartition(topic, partition);
-            topicPartitions.add(topicPartition);
+    private void recreateAssignedConsumers() {
+        List<TopicPartition> topicPartitions;
+        while ((topicPartitions = availableTopicPartitions.poll()) != null) {
+            final SimpleConsumerLease simpleConsumerLease = createConsumerLease(topicPartitions);
+            pooledLeases.add(simpleConsumerLease);
+        }
+    }
+
+    private SimpleConsumerLease createConsumerLease() {
+        if (partitionsToConsume != null) {
+            logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
+            return null;
         }
 
         final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
         consumerCreatedCountRef.incrementAndGet();
+
+        /*
+         * For now return a new consumer lease. But we could later elect to
+         * have this return null if we determine the broker indicates that
+         * the lag time on all topics being monitored is sufficiently low.
+         * For now we should encourage conservative use of threads because
+         * having too many means we'll have at best useless threads sitting
+         * around doing frequent network calls and at worst having consumers
+         * sitting idle which could prompt excessive rebalances.
+         */
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, null);
+
+        // This subscription tightly couples the lease to the given
+        // consumer. They cannot be separated from then on.
+        if (topics == null) {
+            consumer.subscribe(topicPattern, lease);
+        } else {
+            consumer.subscribe(topics, lease);
+        }
+
+        return lease;
+    }
+
+    private SimpleConsumerLease createConsumerLease(final List<TopicPartition> topicPartitions) {
+        final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+        consumerCreatedCountRef.incrementAndGet();
         consumer.assign(topicPartitions);
 
-        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, topicPartitions);
         return lease;
     }
 
-    private void enqueueLeases(final int[] partitionsToConsume) {
+    private void enqueueAssignedPartitions(final int[] partitionsToConsume) {
         if (partitionsToConsume == null) {
             return;
         }
 
         for (final int partition : partitionsToConsume) {
-            final SimpleConsumerLease lease = createConsumerLease(partition);
-            pooledLeases.add(lease);
+            final List<TopicPartition> topicPartitions = createTopicPartitions(partition);
+            availableTopicPartitions.offer(topicPartitions);
         }
     }
 
+    private List<TopicPartition> createTopicPartitions(final int partition) {
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (final String topic : topics) {
+            final TopicPartition topicPartition = new TopicPartition(topic, partition);
+            topicPartitions.add(topicPartition);
+        }
+
+        return topicPartitions;
+    }
+
     /**
      * Exposed as protected method for easier unit testing
      *
@@ -371,16 +404,17 @@ public class ConsumerPool implements Closeable {
     }
 
     private class SimpleConsumerLease extends ConsumerLease {
-
         private final Consumer<byte[], byte[]> consumer;
+        private final List<TopicPartition> assignedPartitions;
         private volatile ProcessSession session;
         private volatile ProcessContext processContext;
         private volatile boolean closedConsumer;
 
-        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, final List<TopicPartition> assignedPartitions) {
             super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
                 readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey);
             this.consumer = consumer;
+            this.assignedPartitions = assignedPartitions;
         }
 
         void setProcessSession(final ProcessSession session, final ProcessContext context) {
@@ -389,6 +423,11 @@ public class ConsumerPool implements Closeable {
         }
 
         @Override
+        public List<TopicPartition> getAssignedPartitions() {
+            return assignedPartitions;
+        }
+
+        @Override
         public void yield() {
             if (processContext != null) {
                 processContext.yield();
@@ -410,18 +449,27 @@ public class ConsumerPool implements Closeable {
             if (closedConsumer) {
                 return;
             }
+
             super.close();
             if (session != null) {
                 session.rollback();
                 setProcessSession(null, null);
             }
+
             if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
                 closedConsumer = true;
                 closeConsumer(consumer);
+
+                // If explicit topic/partition assignment is used, make the assignments for this Lease available again.
+                if (assignedPartitions != null) {
+                    logger.debug("Adding partitions {} back to the pool", assignedPartitions);
+                    availableTopicPartitions.offer(assignedPartitions);
+                }
             }
         }
     }
 
+
     static final class PoolStats {
 
         final long consumerCreatedCount;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 195d2cb..218fb25 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -41,6 +41,9 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -158,6 +161,88 @@ public class ConsumerPoolTest {
     }
 
     @Test
+    public void testConsumerCreatedOnDemand() {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            final List<ConsumerLease> created = new ArrayList<>();
+            try {
+                for (int i = 0; i < 3; i++) {
+                    final ConsumerLease newLease = testPool.obtainConsumer(mockSession, mockContext);
+                    created.add(newLease);
+                    assertNotSame(lease, newLease);
+                }
+            } finally {
+                created.forEach(ConsumerLease::close);
+            }
+        }
+    }
+
+    @Test
+    public void testConsumerNotCreatedOnDemandWhenUsingStaticAssignment() {
+        final ConsumerPool staticAssignmentPool = new ConsumerPool(
+            1,
+            null,
+            false,
+            Collections.emptyMap(),
+            Collections.singletonList("nifi"),
+            100L,
+            "utf-8",
+            "ssl",
+            "localhost",
+            logger,
+            true,
+            StandardCharsets.UTF_8,
+            null,
+            new int[] {1, 2, 3}) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+
+        try (final ConsumerLease lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext)) {
+            ConsumerLease partition2Lease = null;
+            ConsumerLease partition3Lease = null;
+
+            try {
+                partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotSame(lease, partition2Lease);
+                assertEquals(1, partition2Lease.getAssignedPartitions().size());
+                assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition());
+
+                partition3Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotSame(lease, partition3Lease);
+                assertNotSame(partition2Lease, partition3Lease);
+                assertEquals(1, partition3Lease.getAssignedPartitions().size());
+                assertEquals(3, partition3Lease.getAssignedPartitions().get(0).partition());
+
+                final ConsumerLease nullLease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNull(nullLease);
+
+                // Close the lease for Partition 2. We should now be able to get another Lease for Partition 2.
+                partition2Lease.close();
+
+                partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotNull(partition2Lease);
+
+                assertEquals(1, partition2Lease.getAssignedPartitions().size());
+                assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition());
+
+                assertNull(staticAssignmentPool.obtainConsumer(mockSession, mockContext));
+            } finally {
+                closeLeases(partition2Lease, partition3Lease);
+            }
+        }
+    }
+
+    private void closeLeases(final ConsumerLease... leases) {
+        for (final ConsumerLease lease : leases) {
+            if (lease != null) {
+                lease.close();
+            }
+        }
+    }
+
+    @Test
     public void validatePoolSimpleBatchCreateClose() throws Exception {
         when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {

[nifi] 01/02: NIFI-8357: Updated Kafka 2.0 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions

Posted by tu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 2f08d1f466b9f6f0b0b8a7b5893341a0d1433a4e
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Mar 23 13:53:13 2021 -0400

    NIFI-8357: Updated Kafka 2.0 processors to automatically handle recreating Consumer Lease objects when an existing one is poisoned, even if using statically assigned partitions
    
    This closes #4926.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../processors/kafka/pubsub/ConsumerLease.java     |   4 +
 .../nifi/processors/kafka/pubsub/ConsumerPool.java | 124 ++++++++++++++-------
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  86 ++++++++++++++
 3 files changed, 175 insertions(+), 39 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 729c801..d1f47e4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -164,6 +164,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
     }
 
+    public List<TopicPartition> getAssignedPartitions() {
+        return null;
+    }
+
     /**
      * Executes a poll on the underlying Kafka Consumer and creates any new
      * flowfiles necessary or appends to existing ones if in demarcation mode.
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index 60f301b..e8603ff 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ public class ConsumerPool implements Closeable {
     private final AtomicLong consumerCreatedCountRef = new AtomicLong();
     private final AtomicLong consumerClosedCountRef = new AtomicLong();
     private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+    private final Queue<List<TopicPartition>> availableTopicPartitions = new LinkedBlockingQueue<>();
 
     /**
      * Creates a pool of KafkaConsumer objects that will grow up to the maximum
@@ -119,7 +121,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -154,7 +156,7 @@ public class ConsumerPool implements Closeable {
         this.headerNamePattern = headerNamePattern;
         this.separateByKey = separateByKey;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -190,7 +192,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public ConsumerPool(
@@ -226,7 +228,7 @@ public class ConsumerPool implements Closeable {
         this.separateByKey = separateByKey;
         this.keyEncoding = keyEncoding;
         this.partitionsToConsume = partitionsToConsume;
-        enqueueLeases(partitionsToConsume);
+        enqueueAssignedPartitions(partitionsToConsume);
     }
 
     public int getPartitionCount() {
@@ -262,67 +264,97 @@ public class ConsumerPool implements Closeable {
      * @return consumer to use or null if not available or necessary
      */
     public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
+        // If there are any partition assignments that do not have leases in our pool, create the leases and add them to the pool.
+        // This is not necessary for us to handle if using automatic subscriptions because the Kafka protocol will ensure that each consumer
+        // has the appropriate partitions. However, if we are using explicit assignment, it's important to create these leases and add them
+        // to our pool in order to avoid starvation. E.g., if we have only a single concurrent task and 5 partitions assigned, we cannot simply
+        // wait until pooledLeases.poll() returns null to create a new ConsumerLease, as doing so may result in constantly pulling from only a
+        // single partition (since we'd get a Lease for Partition 1, then use it, and put it back in the pool).
+        recreateAssignedConsumers();
+
         SimpleConsumerLease lease = pooledLeases.poll();
         if (lease == null) {
-            final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
-            consumerCreatedCountRef.incrementAndGet();
-
-            /*
-             * For now return a new consumer lease. But we could later elect to
-             * have this return null if we determine the broker indicates that
-             * the lag time on all topics being monitored is sufficiently low.
-             * For now we should encourage conservative use of threads because
-             * having too many means we'll have at best useless threads sitting
-             * around doing frequent network calls and at worst having consumers
-             * sitting idle which could prompt excessive rebalances.
-             */
-            lease = new SimpleConsumerLease(consumer);
-
-            if (partitionsToConsume == null) {
-                // This subscription tightly couples the lease to the given
-                // consumer. They cannot be separated from then on.
-                if (topics != null) {
-                    consumer.subscribe(topics, lease);
-                } else {
-                    consumer.subscribe(topicPattern, lease);
-                }
-            } else {
-                logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
+            lease = createConsumerLease();
+            if (lease == null) {
                 return null;
             }
         }
+
         lease.setProcessSession(session, processContext);
 
         leasesObtainedCountRef.incrementAndGet();
         return lease;
     }
 
-    private SimpleConsumerLease createConsumerLease(final int partition) {
-        final List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (final String topic : topics) {
-            final TopicPartition topicPartition = new TopicPartition(topic, partition);
-            topicPartitions.add(topicPartition);
+    private void recreateAssignedConsumers() {
+        List<TopicPartition> topicPartitions;
+        while ((topicPartitions = availableTopicPartitions.poll()) != null) {
+            final SimpleConsumerLease simpleConsumerLease = createConsumerLease(topicPartitions);
+            pooledLeases.add(simpleConsumerLease);
+        }
+    }
+
+    private SimpleConsumerLease createConsumerLease() {
+        if (partitionsToConsume != null) {
+            logger.debug("Cannot obtain lease to communicate with Kafka. Since partitions are explicitly assigned, cannot create a new lease.");
+            return null;
         }
 
         final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
         consumerCreatedCountRef.incrementAndGet();
+
+        /*
+         * For now return a new consumer lease. But we could later elect to
+         * have this return null if we determine the broker indicates that
+         * the lag time on all topics being monitored is sufficiently low.
+         * For now we should encourage conservative use of threads because
+         * having too many means we'll have at best useless threads sitting
+         * around doing frequent network calls and at worst having consumers
+         * sitting idle which could prompt excessive rebalances.
+         */
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, null);
+
+        // This subscription tightly couples the lease to the given
+        // consumer. They cannot be separated from then on.
+        if (topics == null) {
+            consumer.subscribe(topicPattern, lease);
+        } else {
+            consumer.subscribe(topics, lease);
+        }
+
+        return lease;
+    }
+
+    private SimpleConsumerLease createConsumerLease(final List<TopicPartition> topicPartitions) {
+        final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+        consumerCreatedCountRef.incrementAndGet();
         consumer.assign(topicPartitions);
 
-        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer);
+        final SimpleConsumerLease lease = new SimpleConsumerLease(consumer, topicPartitions);
         return lease;
     }
 
-    private void enqueueLeases(final int[] partitionsToConsume) {
+    private void enqueueAssignedPartitions(final int[] partitionsToConsume) {
         if (partitionsToConsume == null) {
             return;
         }
 
         for (final int partition : partitionsToConsume) {
-            final SimpleConsumerLease lease = createConsumerLease(partition);
-            pooledLeases.add(lease);
+            final List<TopicPartition> topicPartitions = createTopicPartitions(partition);
+            availableTopicPartitions.offer(topicPartitions);
         }
     }
 
+    private List<TopicPartition> createTopicPartitions(final int partition) {
+        final List<TopicPartition> topicPartitions = new ArrayList<>();
+        for (final String topic : topics) {
+            final TopicPartition topicPartition = new TopicPartition(topic, partition);
+            topicPartitions.add(topicPartition);
+        }
+
+        return topicPartitions;
+    }
+
     /**
      * Exposed as protected method for easier unit testing
      *
@@ -372,16 +404,17 @@ public class ConsumerPool implements Closeable {
     }
 
     private class SimpleConsumerLease extends ConsumerLease {
-
         private final Consumer<byte[], byte[]> consumer;
+        private final List<TopicPartition> assignedPartitions;
         private volatile ProcessSession session;
         private volatile ProcessContext processContext;
         private volatile boolean closedConsumer;
 
-        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+        private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, final List<TopicPartition> assignedPartitions) {
             super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
                 readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey);
             this.consumer = consumer;
+            this.assignedPartitions = assignedPartitions;
         }
 
         void setProcessSession(final ProcessSession session, final ProcessContext context) {
@@ -390,6 +423,11 @@ public class ConsumerPool implements Closeable {
         }
 
         @Override
+        public List<TopicPartition> getAssignedPartitions() {
+            return assignedPartitions;
+        }
+
+        @Override
         public void yield() {
             if (processContext != null) {
                 processContext.yield();
@@ -411,14 +449,22 @@ public class ConsumerPool implements Closeable {
             if (closedConsumer) {
                 return;
             }
+
             super.close();
             if (session != null) {
                 session.rollback();
                 setProcessSession(null, null);
             }
+
             if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
                 closedConsumer = true;
                 closeConsumer(consumer);
+
+                // If explicit topic/partition assignment is used, make the assignments for this Lease available again.
+                if (assignedPartitions != null) {
+                    logger.debug("Adding partitions {} back to the pool", assignedPartitions);
+                    availableTopicPartitions.offer(assignedPartitions);
+                }
             }
         }
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 195d2cb..347bf02 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -41,6 +41,9 @@ import java.util.UUID;
 import java.util.regex.Pattern;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -158,6 +161,89 @@ public class ConsumerPoolTest {
     }
 
     @Test
+    public void testConsumerCreatedOnDemand() {
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
+            final List<ConsumerLease> created = new ArrayList<>();
+            try {
+                for (int i = 0; i < 3; i++) {
+                    final ConsumerLease newLease = testPool.obtainConsumer(mockSession, mockContext);
+                    created.add(newLease);
+                    assertNotSame(lease, newLease);
+                }
+            } finally {
+                created.forEach(ConsumerLease::close);
+            }
+        }
+    }
+
+    @Test
+    public void testConsumerNotCreatedOnDemandWhenUsingStaticAssignment() {
+        final ConsumerPool staticAssignmentPool = new ConsumerPool(
+            1,
+            null,
+            false,
+            Collections.emptyMap(),
+            Collections.singletonList("nifi"),
+            100L,
+            "utf-8",
+            "ssl",
+            "localhost",
+            logger,
+            true,
+            StandardCharsets.UTF_8,
+            null,
+            new int[] {1, 2, 3}) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+
+        try (final ConsumerLease lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext)) {
+            ConsumerLease partition2Lease = null;
+            ConsumerLease partition3Lease = null;
+
+            try {
+                partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotSame(lease, partition2Lease);
+                assertEquals(1, partition2Lease.getAssignedPartitions().size());
+                assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition());
+
+                partition3Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotSame(lease, partition3Lease);
+                assertNotSame(partition2Lease, partition3Lease);
+                assertEquals(1, partition3Lease.getAssignedPartitions().size());
+                assertEquals(3, partition3Lease.getAssignedPartitions().get(0).partition());
+
+                final ConsumerLease nullLease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNull(nullLease);
+
+                // Close the lease for Partition 2. We should now be able to get another Lease for Partition 2.
+                partition2Lease.close();
+
+                partition2Lease = staticAssignmentPool.obtainConsumer(mockSession, mockContext);
+                assertNotNull(partition2Lease);
+
+                assertEquals(1, partition2Lease.getAssignedPartitions().size());
+                assertEquals(2, partition2Lease.getAssignedPartitions().get(0).partition());
+
+                assertNull(staticAssignmentPool.obtainConsumer(mockSession, mockContext));
+            } finally {
+                closeLeases(partition2Lease, partition3Lease);
+            }
+        }
+    }
+
+    private void closeLeases(final ConsumerLease... leases) {
+        for (final ConsumerLease lease : leases) {
+            if (lease != null) {
+                lease.close();
+            }
+        }
+    }
+
+
+    @Test
     public void validatePoolSimpleBatchCreateClose() throws Exception {
         when(consumer.poll(any(Duration.class))).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {