You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/01/06 16:35:15 UTC

[nifi] branch master updated: NIFI-6983: Ensure that if any call to kafka's Producer throws a ProducerFencedException that it's handled properly by poisoning the lease, which in turn will close the client

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new b03e5b0  NIFI-6983: Ensure that if any call to kafka's Producer throws a ProducerFencedException that it's handled properly by poisoning the lease, which in turn will close the client
b03e5b0 is described below

commit b03e5b0520230d90d112a1e7958fc17adbed3834
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jan 6 10:46:58 2020 -0500

    NIFI-6983: Ensure that if any call to kafka's Producer throws a ProducerFencedException that it's handled properly by poisoning the lease, which in turn will close the client
    
    This closes #6983.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../processors/kafka/pubsub/PublisherLease.java    |  61 ++++++----
 .../processors/kafka/pubsub/PublisherLease.java    |  38 +++++--
 .../kafka/pubsub/TestPublisherLease.java           | 104 +++++++++--------
 .../kafka/pubsub/PublishKafkaRecord_2_0.java       | 124 +++++++++++----------
 .../processors/kafka/pubsub/PublishKafka_2_0.java  | 108 ++++++++++--------
 .../processors/kafka/pubsub/PublisherLease.java    |  38 +++++--
 .../kafka/pubsub/TestPublishKafkaRecord_2_0.java   |  19 ++++
 .../kafka/pubsub/TestPublishKafka_2_0.java         |  24 +++-
 .../kafka/pubsub/TestPublisherLease.java           | 103 +++++++++--------
 9 files changed, 375 insertions(+), 244 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 0165210..150c7b4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -48,6 +35,19 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
 public class PublisherLease implements Closeable {
     private final ComponentLog logger;
     private final Producer<byte[], byte[]> producer;
@@ -88,12 +88,18 @@ public class PublisherLease implements Closeable {
             return;
         }
 
-        if (!transactionsInitialized) {
-            producer.initTransactions();
-            transactionsInitialized = true;
+        try {
+            if (!transactionsInitialized) {
+                producer.initTransactions();
+                transactionsInitialized = true;
+            }
+
+            producer.beginTransaction();
+        } catch (final Exception e) {
+            poison();
+            throw e;
         }
 
-        producer.beginTransaction();
         activeTransaction = true;
     }
 
@@ -102,7 +108,13 @@ public class PublisherLease implements Closeable {
             return;
         }
 
-        producer.abortTransaction();
+        try {
+            producer.abortTransaction();
+        } catch (final Exception e) {
+            poison();
+            throw e;
+        }
+
         activeTransaction = false;
     }
 
@@ -255,11 +267,16 @@ public class PublisherLease implements Closeable {
             throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
         }
 
-        producer.flush();
+        try {
+            producer.flush();
 
-        if (activeTransaction) {
-            producer.commitTransaction();
-            activeTransaction = false;
+            if (activeTransaction) {
+                producer.commitTransaction();
+                activeTransaction = false;
+            }
+        } catch (final Exception e) {
+            poison();
+            throw e;
         }
 
         try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index f8587d9..4911409 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -89,13 +89,18 @@ public class PublisherLease implements Closeable {
             return;
         }
 
-        if (!transactionsInitialized) {
-            producer.initTransactions();
-            transactionsInitialized = true;
-        }
+        try {
+            if (!transactionsInitialized) {
+                producer.initTransactions();
+                transactionsInitialized = true;
+            }
 
-        producer.beginTransaction();
-        activeTransaction = true;
+            producer.beginTransaction();
+            activeTransaction = true;
+        } catch (final Exception e) {
+            poison();
+            throw e;
+        }
     }
 
     void rollback() {
@@ -103,7 +108,13 @@ public class PublisherLease implements Closeable {
             return;
         }
 
-        producer.abortTransaction();
+        try {
+            producer.abortTransaction();
+        } catch (final Exception e) {
+            poison();
+            throw e;
+        }
+
         activeTransaction = false;
     }
 
@@ -257,11 +268,16 @@ public class PublisherLease implements Closeable {
             throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
         }
 
-        producer.flush();
+        try {
+            producer.flush();
 
-        if (activeTransaction) {
-            producer.commitTransaction();
-            activeTransaction = false;
+            if (activeTransaction) {
+                producer.commitTransaction();
+                activeTransaction = false;
+            }
+        } catch (final Exception e) {
+            poison();
+            throw e;
         }
 
         try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 156af52..16c825a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -56,7 +57,6 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
-
 public class TestPublisherLease {
     private ComponentLog logger;
     private Producer<byte[], byte[]> producer;
@@ -69,16 +69,8 @@ public class TestPublisherLease {
     }
 
     @Test
-    public void testPoisonOnException() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
-            @Override
-            public void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
+    public void testPoisonOnException() {
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
         final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
         // Need a size grater than zero to make the lease reads the InputStream.
@@ -101,25 +93,41 @@ public class TestPublisherLease {
             // expected
         }
 
-        assertEquals(1, poisonCount.get());
+        assertEquals(1, lease.getPoisonCount());
 
         final PublishResult result = lease.complete();
         assertTrue(result.isFailure());
     }
 
     @Test
-    @SuppressWarnings("unchecked")
-    public void testPoisonOnFailure() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
+    public void testPoisonOnExceptionCreatingTransaction() {
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
+        final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+        // Need a size grater than zero to make the lease reads the InputStream.
+        Mockito.when(flowFile.getSize()).thenReturn(1L);
+        doAnswer(new Answer<Object>() {
             @Override
-            public void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
+            public Object answer(final InvocationOnMock invocationOnMock) {
+                throw new ProducerFencedException("Intenitional exception thrown from unit test");
             }
-        };
+        }).when(producer).beginTransaction();
+
+        try {
+            lease.beginTransaction();
+            Assert.fail("Expected ProducerFencedException");
+        } catch (final ProducerFencedException pfe) {
+            // expected
+        }
+
+        assertEquals(1, lease.getPoisonCount());
+    }
 
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPoisonOnFailure() throws IOException {
+        final PoisonCountingLease lease = new PoisonCountingLease();
         final FlowFile flowFile = new MockFlowFile(1L);
         final String topic = "unit-test";
         final byte[] messageKey = null;
@@ -127,7 +135,7 @@ public class TestPublisherLease {
 
         doAnswer(new Answer<Object>() {
             @Override
-            public Object answer(final InvocationOnMock invocation) throws Throwable {
+            public Object answer(final InvocationOnMock invocation) {
                 final Callback callback = invocation.getArgument(1);
                 callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
                 return null;
@@ -136,7 +144,7 @@ public class TestPublisherLease {
 
         lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic, null);
 
-        assertEquals(1, poisonCount.get());
+        assertEquals(1, lease.getPoisonCount());
 
         final PublishResult result = lease.complete();
         assertTrue(result.isFailure());
@@ -145,21 +153,12 @@ public class TestPublisherLease {
     @Test
     @SuppressWarnings("unchecked")
     public void testAllDelimitedMessagesSent() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
-            @Override
-            protected void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
-
+        final PoisonCountingLease lease = new PoisonCountingLease();
         final AtomicInteger correctMessages = new AtomicInteger(0);
         final AtomicInteger incorrectMessages = new AtomicInteger(0);
         doAnswer(new Answer<Object>() {
             @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
+            public Object answer(InvocationOnMock invocation) {
                 final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
                 final byte[] value = record.value();
                 final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -190,7 +189,7 @@ public class TestPublisherLease {
         final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
         lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic, null);
 
-        assertEquals(0, poisonCount.get());
+        assertEquals(0, lease.getPoisonCount());
 
         verify(producer, times(0)).flush();
 
@@ -206,21 +205,13 @@ public class TestPublisherLease {
     @Test
     @SuppressWarnings("unchecked")
     public void testZeroByteMessageSent() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
-            @Override
-            protected void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
         final AtomicInteger correctMessages = new AtomicInteger(0);
         final AtomicInteger incorrectMessages = new AtomicInteger(0);
         doAnswer(new Answer<Object>() {
             @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
+            public Object answer(InvocationOnMock invocation) {
                 final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
                 final byte[] value = record.value();
                 final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -242,11 +233,11 @@ public class TestPublisherLease {
         final byte[] flowFileContent = new byte[0];
         lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic, null);
 
-        assertEquals(0, poisonCount.get());
+        assertEquals(0, lease.getPoisonCount());
 
         verify(producer, times(0)).flush();
 
-        final PublishResult result = lease.complete();
+        lease.complete();
 
         assertEquals(1, correctMessages.get());
         assertEquals(0, incorrectMessages.get());
@@ -256,7 +247,7 @@ public class TestPublisherLease {
 
     @Test
     public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
         final FlowFile flowFile = new MockFlowFile(1L);
         final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
@@ -284,5 +275,24 @@ public class TestPublisherLease {
         verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
         verify(writer, times(2)).write(any(Record.class));
         verify(producer, times(2)).send(any(), any());
+        assertEquals(0, lease.getPoisonCount());
+    }
+
+    private class PoisonCountingLease extends PublisherLease {
+        private final AtomicInteger poisonCount  = new AtomicInteger(0);
+
+        public PoisonCountingLease() {
+            super(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void poison() {
+            poisonCount.incrementAndGet();
+            super.poison();
+        }
+
+        public int getPoisonCount() {
+            return poisonCount.get();
+        }
     }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index fe502b3..84a8752 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -18,6 +18,9 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -445,76 +448,83 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
-            if (useTransactions) {
-                lease.beginTransaction();
-            }
+            try {
+                if (useTransactions) {
+                    lease.beginTransaction();
+                }
 
-            // Send each FlowFile to Kafka asynchronously.
-            final Iterator<FlowFile> itr = flowFiles.iterator();
-            while (itr.hasNext()) {
-                final FlowFile flowFile = itr.next();
-
-                if (!isScheduled()) {
-                    // If stopped, re-queue FlowFile instead of sending it
-                    if (useTransactions) {
-                        session.rollback();
-                        lease.rollback();
-                        return;
-                    }
+                // Send each FlowFile to Kafka asynchronously.
+                final Iterator<FlowFile> itr = flowFiles.iterator();
+                while (itr.hasNext()) {
+                    final FlowFile flowFile = itr.next();
+
+                    if (!isScheduled()) {
+                        // If stopped, re-queue FlowFile instead of sending it
+                        if (useTransactions) {
+                            session.rollback();
+                            lease.rollback();
+                            return;
+                        }
 
-                    session.transfer(flowFile);
-                    itr.remove();
-                    continue;
-                }
+                        session.transfer(flowFile);
+                        itr.remove();
+                        continue;
+                    }
 
-                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-                final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+                    final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                    final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
 
-                final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);
+                    final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);
 
-                try {
-                    session.read(flowFile, new InputStreamCallback() {
-                        @Override
-                        public void process(final InputStream in) throws IOException {
-                            try {
-                                final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
-                                final RecordSet recordSet = reader.createRecordSet();
+                    try {
+                        session.read(flowFile, new InputStreamCallback() {
+                            @Override
+                            public void process(final InputStream in) throws IOException {
+                                try {
+                                    final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+                                    final RecordSet recordSet = reader.createRecordSet();
 
-                                final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
-                                lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
-                            } catch (final SchemaNotFoundException | MalformedRecordException e) {
-                                throw new ProcessException(e);
+                                    final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
+                                    lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
+                                } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                                    throw new ProcessException(e);
+                                }
                             }
-                        }
-                    });
-                } catch (final Exception e) {
-                    // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
-                    lease.fail(flowFile, e);
-                    continue;
+                        });
+                    } catch (final Exception e) {
+                        // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
+                        lease.fail(flowFile, e);
+                        continue;
+                    }
                 }
-            }
 
-            // Complete the send
-            final PublishResult publishResult = lease.complete();
+                // Complete the send
+                final PublishResult publishResult = lease.complete();
 
-            if (publishResult.isFailure()) {
-                getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
-                session.transfer(flowFiles, REL_FAILURE);
-                return;
-            }
+                if (publishResult.isFailure()) {
+                    getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+                    session.transfer(flowFiles, REL_FAILURE);
+                    return;
+                }
 
-            // Transfer any successful FlowFiles.
-            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
-            for (FlowFile success : flowFiles) {
-                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+                // Transfer any successful FlowFiles.
+                final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+                for (FlowFile success : flowFiles) {
+                    final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
 
-                final int msgCount = publishResult.getSuccessfulMessageCount(success);
-                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
-                session.adjustCounter("Messages Sent", msgCount, true);
+                    final int msgCount = publishResult.getSuccessfulMessageCount(success);
+                    success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+                    session.adjustCounter("Messages Sent", msgCount, true);
 
-                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
-                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
-                session.transfer(success, REL_SUCCESS);
+                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                    session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+                    session.transfer(success, REL_SUCCESS);
+                }
+            } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+                lease.poison();
+                getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
+                session.transfer(flowFiles, REL_FAILURE);
+                context.yield();
             }
         }
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index a42860b..f0e3f57 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -18,6 +18,9 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -413,65 +416,72 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
-            if (useTransactions) {
-                lease.beginTransaction();
-            }
+            try {
+                if (useTransactions) {
+                    lease.beginTransaction();
+                }
 
-            // Send each FlowFile to Kafka asynchronously.
-            for (final FlowFile flowFile : flowFiles) {
-                if (!isScheduled()) {
-                    // If stopped, re-queue FlowFile instead of sending it
-                    if (useTransactions) {
-                        session.rollback();
-                        lease.rollback();
-                        return;
-                    }
+                // Send each FlowFile to Kafka asynchronously.
+                for (final FlowFile flowFile : flowFiles) {
+                    if (!isScheduled()) {
+                        // If stopped, re-queue FlowFile instead of sending it
+                        if (useTransactions) {
+                            session.rollback();
+                            lease.rollback();
+                            return;
+                        }
 
-                    session.transfer(flowFile);
-                    continue;
-                }
+                        session.transfer(flowFile);
+                        continue;
+                    }
 
-                final byte[] messageKey = getMessageKey(flowFile, context);
-                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
-                final byte[] demarcatorBytes;
-                if (useDemarcator) {
-                    demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
-                } else {
-                    demarcatorBytes = null;
-                }
+                    final byte[] messageKey = getMessageKey(flowFile, context);
+                    final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+                    final byte[] demarcatorBytes;
+                    if (useDemarcator) {
+                        demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+                    } else {
+                        demarcatorBytes = null;
+                    }
 
-                final Integer partition = getPartition(context, flowFile);
-                session.read(flowFile, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream rawIn) throws IOException {
-                        try (final InputStream in = new BufferedInputStream(rawIn)) {
-                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
+                    final Integer partition = getPartition(context, flowFile);
+                    session.read(flowFile, new InputStreamCallback() {
+                        @Override
+                        public void process(final InputStream rawIn) throws IOException {
+                            try (final InputStream in = new BufferedInputStream(rawIn)) {
+                                lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
+                            }
                         }
-                    }
-                });
-            }
+                    });
+                }
 
-            // Complete the send
-            final PublishResult publishResult = lease.complete();
+                // Complete the send
+                final PublishResult publishResult = lease.complete();
 
-            if (publishResult.isFailure()) {
-                getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
-                session.transfer(flowFiles, REL_FAILURE);
-                return;
-            }
+                if (publishResult.isFailure()) {
+                    getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+                    session.transfer(flowFiles, REL_FAILURE);
+                    return;
+                }
 
-            // Transfer any successful FlowFiles.
-            final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
-            for (FlowFile success : flowFiles) {
-                final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+                // Transfer any successful FlowFiles.
+                final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+                for (FlowFile success : flowFiles) {
+                    final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
 
-                final int msgCount = publishResult.getSuccessfulMessageCount(success);
-                success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
-                session.adjustCounter("Messages Sent", msgCount, true);
+                    final int msgCount = publishResult.getSuccessfulMessageCount(success);
+                    success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+                    session.adjustCounter("Messages Sent", msgCount, true);
 
-                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
-                session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
-                session.transfer(success, REL_SUCCESS);
+                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                    session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+                    session.transfer(success, REL_SUCCESS);
+                }
+            } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+                lease.poison();
+                getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
+                session.transfer(flowFiles, REL_FAILURE);
+                context.yield();
             }
         }
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index f8587d9..4911409 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -89,13 +89,18 @@ public class PublisherLease implements Closeable {
             return;
         }
 
-        if (!transactionsInitialized) {
-            producer.initTransactions();
-            transactionsInitialized = true;
-        }
+        try {
+            if (!transactionsInitialized) {
+                producer.initTransactions();
+                transactionsInitialized = true;
+            }
 
-        producer.beginTransaction();
-        activeTransaction = true;
+            producer.beginTransaction();
+            activeTransaction = true;
+        } catch (final Exception e) {
+            poison();
+            throw e;
+        }
     }
 
     void rollback() {
@@ -103,7 +108,13 @@ public class PublisherLease implements Closeable {
             return;
         }
 
-        producer.abortTransaction();
+        try {
+            producer.abortTransaction();
+        } catch (final Exception e) {
+            poison();
+            throw e;
+        }
+
         activeTransaction = false;
     }
 
@@ -257,11 +268,16 @@ public class PublisherLease implements Closeable {
             throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
         }
 
-        producer.flush();
+        try {
+            producer.flush();
 
-        if (activeTransaction) {
-            producer.commitTransaction();
-            activeTransaction = false;
+            if (activeTransaction) {
+                producer.commitTransaction();
+                activeTransaction = false;
+            }
+        } catch (final Exception e) {
+            poison();
+            throw e;
         }
 
         try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index e7b3c15..1803451 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -154,6 +155,24 @@ public class TestPublishKafkaRecord_2_0 {
     }
 
     @Test
+    public void testFailureWhenCreationgTransaction() {
+        runner.enqueue("John Doe, 48");
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocationOnMock) {
+                throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+            }
+        }).when(mockLease).beginTransaction();
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
     public void testMultipleFailures() throws IOException {
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("John Doe, 48"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index 14fb5d4..82bd3fb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.kafka.pubsub;
 
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
@@ -24,6 +25,8 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -41,6 +44,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -93,7 +97,6 @@ public class TestPublishKafka_2_0 {
         flowFiles.add(runner.enqueue("hello world"));
         flowFiles.add(runner.enqueue("hello world"));
 
-
         when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
 
         runner.run();
@@ -106,6 +109,25 @@ public class TestPublishKafka_2_0 {
     }
 
     @Test
+    public void testPublisherPoisonedIfFencedDuringTransactionCreation() {
+        runner.enqueue("hello world");
+        runner.enqueue("Hello World");
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocationOnMock) {
+                throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+            }
+        }).when(mockLease).beginTransaction();
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 2);
+
+        verify(mockLease, times(1)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
     public void testSingleFailure() throws IOException {
         final MockFlowFile flowFile = runner.enqueue("hello world");
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 60d157c..16c825a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -68,16 +69,8 @@ public class TestPublisherLease {
     }
 
     @Test
-    public void testPoisonOnException() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
-            @Override
-            public void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
+    public void testPoisonOnException() {
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
         final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
         // Need a size grater than zero to make the lease reads the InputStream.
@@ -100,25 +93,41 @@ public class TestPublisherLease {
             // expected
         }
 
-        assertEquals(1, poisonCount.get());
+        assertEquals(1, lease.getPoisonCount());
 
         final PublishResult result = lease.complete();
         assertTrue(result.isFailure());
     }
 
     @Test
-    @SuppressWarnings("unchecked")
-    public void testPoisonOnFailure() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
+    public void testPoisonOnExceptionCreatingTransaction() {
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
+        final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+        // Need a size grater than zero to make the lease reads the InputStream.
+        Mockito.when(flowFile.getSize()).thenReturn(1L);
+        doAnswer(new Answer<Object>() {
             @Override
-            public void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
+            public Object answer(final InvocationOnMock invocationOnMock) {
+                throw new ProducerFencedException("Intenitional exception thrown from unit test");
             }
-        };
+        }).when(producer).beginTransaction();
+
+        try {
+            lease.beginTransaction();
+            Assert.fail("Expected ProducerFencedException");
+        } catch (final ProducerFencedException pfe) {
+            // expected
+        }
 
+        assertEquals(1, lease.getPoisonCount());
+    }
+
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPoisonOnFailure() throws IOException {
+        final PoisonCountingLease lease = new PoisonCountingLease();
         final FlowFile flowFile = new MockFlowFile(1L);
         final String topic = "unit-test";
         final byte[] messageKey = null;
@@ -126,7 +135,7 @@ public class TestPublisherLease {
 
         doAnswer(new Answer<Object>() {
             @Override
-            public Object answer(final InvocationOnMock invocation) throws Throwable {
+            public Object answer(final InvocationOnMock invocation) {
                 final Callback callback = invocation.getArgument(1);
                 callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
                 return null;
@@ -135,7 +144,7 @@ public class TestPublisherLease {
 
         lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic, null);
 
-        assertEquals(1, poisonCount.get());
+        assertEquals(1, lease.getPoisonCount());
 
         final PublishResult result = lease.complete();
         assertTrue(result.isFailure());
@@ -144,21 +153,12 @@ public class TestPublisherLease {
     @Test
     @SuppressWarnings("unchecked")
     public void testAllDelimitedMessagesSent() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
-            @Override
-            protected void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
-
+        final PoisonCountingLease lease = new PoisonCountingLease();
         final AtomicInteger correctMessages = new AtomicInteger(0);
         final AtomicInteger incorrectMessages = new AtomicInteger(0);
         doAnswer(new Answer<Object>() {
             @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
+            public Object answer(InvocationOnMock invocation) {
                 final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
                 final byte[] value = record.value();
                 final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -189,7 +189,7 @@ public class TestPublisherLease {
         final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
         lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic, null);
 
-        assertEquals(0, poisonCount.get());
+        assertEquals(0, lease.getPoisonCount());
 
         verify(producer, times(0)).flush();
 
@@ -205,21 +205,13 @@ public class TestPublisherLease {
     @Test
     @SuppressWarnings("unchecked")
     public void testZeroByteMessageSent() throws IOException {
-        final AtomicInteger poisonCount = new AtomicInteger(0);
-
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
-            @Override
-            protected void poison() {
-                poisonCount.incrementAndGet();
-                super.poison();
-            }
-        };
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
         final AtomicInteger correctMessages = new AtomicInteger(0);
         final AtomicInteger incorrectMessages = new AtomicInteger(0);
         doAnswer(new Answer<Object>() {
             @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
+            public Object answer(InvocationOnMock invocation) {
                 final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
                 final byte[] value = record.value();
                 final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -241,11 +233,11 @@ public class TestPublisherLease {
         final byte[] flowFileContent = new byte[0];
         lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic, null);
 
-        assertEquals(0, poisonCount.get());
+        assertEquals(0, lease.getPoisonCount());
 
         verify(producer, times(0)).flush();
 
-        final PublishResult result = lease.complete();
+        lease.complete();
 
         assertEquals(1, correctMessages.get());
         assertEquals(0, incorrectMessages.get());
@@ -255,7 +247,7 @@ public class TestPublisherLease {
 
     @Test
     public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
-        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
+        final PoisonCountingLease lease = new PoisonCountingLease();
 
         final FlowFile flowFile = new MockFlowFile(1L);
         final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
@@ -283,5 +275,24 @@ public class TestPublisherLease {
         verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
         verify(writer, times(2)).write(any(Record.class));
         verify(producer, times(2)).send(any(), any());
+        assertEquals(0, lease.getPoisonCount());
+    }
+
+    private class PoisonCountingLease extends PublisherLease {
+        private final AtomicInteger poisonCount  = new AtomicInteger(0);
+
+        public PoisonCountingLease() {
+            super(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void poison() {
+            poisonCount.incrementAndGet();
+            super.poison();
+        }
+
+        public int getPoisonCount() {
+            return poisonCount.get();
+        }
     }
 }