You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/04/08 15:51:33 UTC

[nifi] branch main updated: NIFI-8404: Added capability to rollback on failure for PublishKafka(Record)_2_0 and _2_6

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 78b69f1  NIFI-8404: Added capability to rollback on failure for PublishKafka(Record)_2_0 and _2_6
78b69f1 is described below

commit 78b69f10dcef71ea149bdeb8e97073b7175b8e7b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Apr 8 10:27:10 2021 -0400

    NIFI-8404: Added capability to rollback on failure for PublishKafka(Record)_2_0 and _2_6
    
    NIFI-8404: Added unit tests for new Failure Strategy property
    
    This closes #4982.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../kafka/pubsub/KafkaProcessorUtils.java          | 15 +++++
 .../kafka/pubsub/PublishFailureStrategy.java       | 35 ++++++++++
 .../kafka/pubsub/PublishKafkaRecord_2_0.java       | 17 ++++-
 .../processors/kafka/pubsub/PublishKafka_2_0.java  | 75 +++++++++++++---------
 .../kafka/pubsub/TestPublishKafkaRecord_2_0.java   | 60 +++++++++++++++++
 .../kafka/pubsub/TestPublishKafka_2_0.java         | 36 +++++++++++
 .../kafka/pubsub/KafkaProcessorUtils.java          | 15 +++++
 .../kafka/pubsub/PublishFailureStrategy.java       | 35 ++++++++++
 .../kafka/pubsub/PublishKafkaRecord_2_6.java       | 17 ++++-
 .../processors/kafka/pubsub/PublishKafka_2_6.java  | 16 ++++-
 .../kafka/pubsub/TestPublishKafkaRecord_2_6.java   | 60 +++++++++++++++++
 .../kafka/pubsub/TestPublishKafka_2_6.java         | 36 +++++++++++
 12 files changed, 380 insertions(+), 37 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index bdddfad..9950273 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -95,6 +95,12 @@ public final class KafkaProcessorUtils {
     static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " +
             "The username and password properties must be set when using this mechanism.");
 
+    static final AllowableValue FAILURE_STRATEGY_FAILURE_RELATIONSHIP = new AllowableValue("Route to Failure", "Route to Failure",
+        "When unable to publish a FlowFile to Kafka, the FlowFile will be routed to the 'failure' relationship.");
+    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("Rollback", "Rollback",
+        "When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
+            "For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
+
     public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
             .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
             .displayName("Kafka Brokers")
@@ -192,6 +198,15 @@ public final class KafkaProcessorUtils {
         .required(false)
         .build();
 
+    static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Failure Strategy")
+        .displayName("Failure Strategy")
+        .description("Dictates how the processor handles a FlowFile if it is unable to publish the data to Kafka")
+        .required(true)
+        .allowableValues(FAILURE_STRATEGY_FAILURE_RELATIONSHIP, FAILURE_STRATEGY_ROLLBACK)
+        .defaultValue(FAILURE_STRATEGY_FAILURE_RELATIONSHIP.getValue())
+        .build();
+
     static List<PropertyDescriptor> getCommonPropertyDescriptors() {
         return Arrays.asList(
                 BOOTSTRAP_SERVERS,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
new file mode 100644
index 0000000..878d60f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+/**
+ * Strategy for allowing multiple implementations of handling failure scenarios when publishing data to Kafka
+ */
+public interface PublishFailureStrategy {
+    /**
+     * Routes the FlowFiles to the appropriate destination
+     * @param session the process session that the flowfiles belong to
+     * @param flowFiles the flowfiles to transfer
+     */
+    void routeFlowFiles(ProcessSession session, List<FlowFile> flowFiles);
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 84a8752..d3f9485 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -79,6 +79,8 @@ import java.util.regex.Pattern;
 import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
 
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.0 Producer API. "
@@ -289,6 +291,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
         properties.add(RECORD_READER);
         properties.add(RECORD_WRITER);
         properties.add(USE_TRANSACTIONS);
+        properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
         properties.add(TRANSACTIONAL_ID_PREFIX);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(ATTRIBUTE_NAME_REGEX);
@@ -445,6 +448,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+        final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -503,7 +507,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
                 if (publishResult.isFailure()) {
                     getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
-                    session.transfer(flowFiles, REL_FAILURE);
+                    failureStrategy.routeFlowFiles(session, flowFiles);
                     return;
                 }
 
@@ -523,7 +527,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
             } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                 lease.poison();
                 getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
-                session.transfer(flowFiles, REL_FAILURE);
+                failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
         }
@@ -557,4 +561,13 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
         return accumulator.intValue();
     }
+
+    private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+        final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+        if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+            return (session, flowFiles) -> session.rollback();
+        } else {
+            return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index f0e3f57..6b4b0a2 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -30,9 +30,9 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
@@ -64,6 +64,10 @@ import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
 import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
 
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.0 Producer API."
@@ -75,7 +79,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
     description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
-        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+        expressionLanguageScope = VARIABLE_REGISTRY)
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
     + "be greater than 1.")
@@ -107,48 +111,48 @@ public class PublishKafka_2_0 extends AbstractProcessor {
     static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
         "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
 
-    static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TOPIC = new Builder()
         .name("topic")
         .displayName("Topic Name")
         .description("The name of the Kafka Topic to publish to.")
         .required(true)
         .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor DELIVERY_GUARANTEE = new Builder()
         .name(ProducerConfig.ACKS_CONFIG)
         .displayName("Delivery Guarantee")
         .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
         .required(true)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
         .defaultValue(DELIVERY_BEST_EFFORT.getValue())
         .build();
 
-    static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor METADATA_WAIT_TIME = new Builder()
         .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
         .displayName("Max Metadata Wait Time")
         .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
             + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
         .required(true)
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .defaultValue("5 sec")
         .build();
 
-    static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor ACK_WAIT_TIME = new Builder()
         .name("ack.wait.time")
         .displayName("Acknowledgment Wait Time")
         .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
             + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(true)
         .defaultValue("5 secs")
         .build();
 
-    static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor MAX_REQUEST_SIZE = new Builder()
         .name("max.request.size")
         .displayName("Max Request Size")
         .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
@@ -157,7 +161,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         .defaultValue("1 MB")
         .build();
 
-    static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor KEY = new Builder()
         .name("kafka-key")
         .displayName("Kafka Key")
         .description("The Key to use for the Message. "
@@ -167,10 +171,10 @@ public class PublishKafka_2_0 extends AbstractProcessor {
             + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
         .required(false)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new Builder()
         .name("key-attribute-encoding")
         .displayName("Key Attribute Encoding")
         .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
@@ -179,19 +183,19 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         .allowableValues(UTF8_ENCODING, HEX_ENCODING)
         .build();
 
-    static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor MESSAGE_DEMARCATOR = new Builder()
         .name("message-demarcator")
         .displayName("Message Demarcator")
         .required(false)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+        .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
             + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
             + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
             + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.")
         .build();
 
-    static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor PARTITION_CLASS = new Builder()
         .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
         .displayName("Partitioner class")
         .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
@@ -200,7 +204,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         .required(false)
         .build();
 
-    static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor PARTITION = new Builder()
         .name("partition")
         .displayName("Partition")
         .description("Specifies which Partition Records will go to.")
@@ -209,7 +213,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
         .build();
 
-    static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor COMPRESSION_CODEC = new Builder()
         .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
         .displayName("Compression Type")
         .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
@@ -219,37 +223,37 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         .defaultValue("none")
         .build();
 
-    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new Builder()
         .name("attribute-name-regex")
         .displayName("Attributes to Send as Headers (Regex)")
         .description("A Regular Expression that is matched against all FlowFile attribute names. "
             + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
             + "If not specified, no FlowFile attributes will be added as headers.")
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .required(false)
         .build();
-    static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
         .name("use-transactions")
         .displayName("Use Transactions")
         .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, "
             + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
             + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
             + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+        .expressionLanguageSupported(NONE)
         .allowableValues("true", "false")
         .defaultValue("true")
         .required(true)
         .build();
-    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new Builder()
         .name("transactional-id-prefix")
         .displayName("Transactional Id Prefix")
         .description("When Use Transaction is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.")
-        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .expressionLanguageSupported(VARIABLE_REGISTRY)
         .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
         .required(false)
         .build();
-    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new Builder()
         .name("message-header-encoding")
         .displayName("Message Header Encoding")
         .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
@@ -279,6 +283,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
         properties.add(TOPIC);
         properties.add(DELIVERY_GUARANTEE);
+        properties.add(FAILURE_STRATEGY);
         properties.add(USE_TRANSACTIONS);
         properties.add(TRANSACTIONAL_ID_PREFIX);
         properties.add(ATTRIBUTE_NAME_REGEX);
@@ -313,12 +318,12 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
-        return new PropertyDescriptor.Builder()
+        return new Builder()
             .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
             .name(propertyDescriptorName)
             .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
             .dynamic(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
     }
 
@@ -413,6 +418,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
         final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+        final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -460,7 +466,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
                 if (publishResult.isFailure()) {
                     getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
-                    session.transfer(flowFiles, REL_FAILURE);
+                    failureStrategy.routeFlowFiles(session, flowFiles);
                     return;
                 }
 
@@ -480,12 +486,20 @@ public class PublishKafka_2_0 extends AbstractProcessor {
             } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                 lease.poison();
                 getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
-                session.transfer(flowFiles, REL_FAILURE);
+                failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
         }
     }
 
+    private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+        final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+        if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+            return (session, flowFiles) -> session.rollback();
+        } else {
+            return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+        }
+    }
 
     private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
 
@@ -518,5 +532,4 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
         return null;
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index 1803451..dec9176 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -155,6 +155,25 @@ public class TestPublishKafkaRecord_2_0 {
     }
 
     @Test
+    public void testSingleFailureWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 0);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+            AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+
+    @Test
     public void testFailureWhenCreationgTransaction() {
         runner.enqueue("John Doe, 48");
 
@@ -173,6 +192,26 @@ public class TestPublishKafkaRecord_2_0 {
     }
 
     @Test
+    public void testFailureWhenCreatingTransactionWithRollback() {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.enqueue("John Doe, 48");
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocationOnMock) {
+                throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+            }
+        }).when(mockLease).beginTransaction();
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 0);
+
+        verify(mockLease, times(1)).poison();
+        verify(mockLease, times(1)).close();
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleFailures() throws IOException {
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("John Doe, 48"));
@@ -191,6 +230,27 @@ public class TestPublishKafkaRecord_2_0 {
     }
 
     @Test
+    public void testMultipleFailuresWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 0);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+            AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleMessagesPerFlowFile() throws IOException {
         final List<FlowFile> flowFiles = new ArrayList<>();
         flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index 82bd3fb..1cdc249 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -142,6 +142,22 @@ public class TestPublishKafka_2_0 {
     }
 
     @Test
+    public void testSingleFailureWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 0);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleFailures() throws IOException {
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("hello world"));
@@ -159,6 +175,26 @@ public class TestPublishKafka_2_0 {
     }
 
     @Test
+    public void testMultipleFailuresWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 0);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+        verify(mockLease, times(1)).close();
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleMessagesPerFlowFile() throws IOException {
         final List<FlowFile> flowFiles = new ArrayList<>();
         flowFiles.add(runner.enqueue("hello world"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index bdddfad..9950273 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -95,6 +95,12 @@ public final class KafkaProcessorUtils {
     static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " +
             "The username and password properties must be set when using this mechanism.");
 
+    static final AllowableValue FAILURE_STRATEGY_FAILURE_RELATIONSHIP = new AllowableValue("Route to Failure", "Route to Failure",
+        "When unable to publish a FlowFile to Kafka, the FlowFile will be routed to the 'failure' relationship.");
+    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("Rollback", "Rollback",
+        "When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
+            "For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
+
     public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
             .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
             .displayName("Kafka Brokers")
@@ -192,6 +198,15 @@ public final class KafkaProcessorUtils {
         .required(false)
         .build();
 
+    static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
+        .name("Failure Strategy")
+        .displayName("Failure Strategy")
+        .description("Dictates how the processor handles a FlowFile if it is unable to publish the data to Kafka")
+        .required(true)
+        .allowableValues(FAILURE_STRATEGY_FAILURE_RELATIONSHIP, FAILURE_STRATEGY_ROLLBACK)
+        .defaultValue(FAILURE_STRATEGY_FAILURE_RELATIONSHIP.getValue())
+        .build();
+
     static List<PropertyDescriptor> getCommonPropertyDescriptors() {
         return Arrays.asList(
                 BOOTSTRAP_SERVERS,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
new file mode 100644
index 0000000..878d60f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+/**
+ * Strategy for allowing multiple implementations of handling failure scenarios when publishing data to Kafka
+ */
+public interface PublishFailureStrategy {
+    /**
+     * Routes the FlowFiles to the appropriate destination
+     * @param session the process session that the flowfiles belong to
+     * @param flowFiles the flowfiles to transfer
+     */
+    void routeFlowFiles(ProcessSession session, List<FlowFile> flowFiles);
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index c701680..2400416 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -79,6 +79,8 @@ import java.util.regex.Pattern;
 import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
 
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.5"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.5 Producer API. "
@@ -289,6 +291,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
         properties.add(RECORD_READER);
         properties.add(RECORD_WRITER);
         properties.add(USE_TRANSACTIONS);
+        properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
         properties.add(TRANSACTIONAL_ID_PREFIX);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(ATTRIBUTE_NAME_REGEX);
@@ -445,6 +448,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+        final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -503,7 +507,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
 
                 if (publishResult.isFailure()) {
                     getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
-                    session.transfer(flowFiles, REL_FAILURE);
+                    failureStrategy.routeFlowFiles(session, flowFiles);
                     return;
                 }
 
@@ -523,7 +527,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
             } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                 lease.poison();
                 getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
-                session.transfer(flowFiles, REL_FAILURE);
+                failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
         }
@@ -557,4 +561,13 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
 
         return accumulator.intValue();
     }
+
+    private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+        final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+        if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+            return (session, flowFiles) -> session.rollback();
+        } else {
+            return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index 4d05df1..07602a3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -64,6 +64,8 @@ import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
 import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
 
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.5"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.5 Producer API."
@@ -279,6 +281,7 @@ public class PublishKafka_2_6 extends AbstractProcessor {
         properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
         properties.add(TOPIC);
         properties.add(DELIVERY_GUARANTEE);
+        properties.add(FAILURE_STRATEGY);
         properties.add(USE_TRANSACTIONS);
         properties.add(TRANSACTIONAL_ID_PREFIX);
         properties.add(ATTRIBUTE_NAME_REGEX);
@@ -413,6 +416,7 @@ public class PublishKafka_2_6 extends AbstractProcessor {
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
         final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+        final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
 
         final long startTime = System.nanoTime();
         try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -460,7 +464,7 @@ public class PublishKafka_2_6 extends AbstractProcessor {
 
                 if (publishResult.isFailure()) {
                     getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
-                    session.transfer(flowFiles, REL_FAILURE);
+                    failureStrategy.routeFlowFiles(session, flowFiles);
                     return;
                 }
 
@@ -480,12 +484,20 @@ public class PublishKafka_2_6 extends AbstractProcessor {
             } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                 lease.poison();
                 getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
-                session.transfer(flowFiles, REL_FAILURE);
+                failureStrategy.routeFlowFiles(session, flowFiles);
                 context.yield();
             }
         }
     }
 
+    private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+        final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+        if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+            return (session, flowFiles) -> session.rollback();
+        } else {
+            return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+        }
+    }
 
     private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java
index 2624d38..aefc1c0 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java
@@ -155,6 +155,25 @@ public class TestPublishKafkaRecord_2_6 {
     }
 
     @Test
+    public void testSingleFailureWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+            AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+
+    @Test
     public void testFailureWhenCreationgTransaction() {
         runner.enqueue("John Doe, 48");
 
@@ -173,6 +192,26 @@ public class TestPublishKafkaRecord_2_6 {
     }
 
     @Test
+    public void testFailureWhenCreatingTransactionWithRollback() {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.enqueue("John Doe, 48");
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocationOnMock) {
+                throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+            }
+        }).when(mockLease).beginTransaction();
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
+
+        verify(mockLease, times(1)).poison();
+        verify(mockLease, times(1)).close();
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleFailures() throws IOException {
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("John Doe, 48"));
@@ -191,6 +230,27 @@ public class TestPublishKafkaRecord_2_6 {
     }
 
     @Test
+    public void testMultipleFailuresWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+            AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleMessagesPerFlowFile() throws IOException {
         final List<FlowFile> flowFiles = new ArrayList<>();
         flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java
index d2232a2..1f410a7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java
@@ -142,6 +142,42 @@ public class TestPublishKafka_2_6 {
     }
 
     @Test
+    public void testSingleFailureWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 0);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+        verify(mockLease, times(1)).close();
+
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testMultipleFailuresWithRollback() throws IOException {
+        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 0);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+        verify(mockLease, times(1)).close();
+
+        assertEquals(3, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
     public void testMultipleFailures() throws IOException {
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("hello world"));