You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2021/04/08 15:51:33 UTC
[nifi] branch main updated: NIFI-8404: Added capability to rollback
on failure for PublishKafka(Record)_2_0 and _2_6
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 78b69f1 NIFI-8404: Added capability to rollback on failure for PublishKafka(Record)_2_0 and _2_6
78b69f1 is described below
commit 78b69f10dcef71ea149bdeb8e97073b7175b8e7b
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Apr 8 10:27:10 2021 -0400
NIFI-8404: Added capability to rollback on failure for PublishKafka(Record)_2_0 and _2_6
NIFI-8404: Added unit tests for new Failure Strategy property
This closes #4982.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../kafka/pubsub/KafkaProcessorUtils.java | 15 +++++
.../kafka/pubsub/PublishFailureStrategy.java | 35 ++++++++++
.../kafka/pubsub/PublishKafkaRecord_2_0.java | 17 ++++-
.../processors/kafka/pubsub/PublishKafka_2_0.java | 75 +++++++++++++---------
.../kafka/pubsub/TestPublishKafkaRecord_2_0.java | 60 +++++++++++++++++
.../kafka/pubsub/TestPublishKafka_2_0.java | 36 +++++++++++
.../kafka/pubsub/KafkaProcessorUtils.java | 15 +++++
.../kafka/pubsub/PublishFailureStrategy.java | 35 ++++++++++
.../kafka/pubsub/PublishKafkaRecord_2_6.java | 17 ++++-
.../processors/kafka/pubsub/PublishKafka_2_6.java | 16 ++++-
.../kafka/pubsub/TestPublishKafkaRecord_2_6.java | 60 +++++++++++++++++
.../kafka/pubsub/TestPublishKafka_2_6.java | 36 +++++++++++
12 files changed, 380 insertions(+), 37 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index bdddfad..9950273 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -95,6 +95,12 @@ public final class KafkaProcessorUtils {
static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " +
"The username and password properties must be set when using this mechanism.");
+ static final AllowableValue FAILURE_STRATEGY_FAILURE_RELATIONSHIP = new AllowableValue("Route to Failure", "Route to Failure",
+ "When unable to publish a FlowFile to Kafka, the FlowFile will be routed to the 'failure' relationship.");
+ static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("Rollback", "Rollback",
+ "When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
+ "For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
+
public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.displayName("Kafka Brokers")
@@ -192,6 +198,15 @@ public final class KafkaProcessorUtils {
.required(false)
.build();
+ static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Failure Strategy")
+ .displayName("Failure Strategy")
+ .description("Dictates how the processor handles a FlowFile if it is unable to publish the data to Kafka")
+ .required(true)
+ .allowableValues(FAILURE_STRATEGY_FAILURE_RELATIONSHIP, FAILURE_STRATEGY_ROLLBACK)
+ .defaultValue(FAILURE_STRATEGY_FAILURE_RELATIONSHIP.getValue())
+ .build();
+
static List<PropertyDescriptor> getCommonPropertyDescriptors() {
return Arrays.asList(
BOOTSTRAP_SERVERS,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
new file mode 100644
index 0000000..878d60f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+/**
+ * Strategy for allowing multiple implementations of handling failure scenarios when publishing data to Kafka
+ */
+public interface PublishFailureStrategy {
+ /**
+ * Routes the FlowFiles to the appropriate destination
+ * @param session the process session that the flowfiles belong to
+ * @param flowFiles the flowfiles to transfer
+ */
+ void routeFlowFiles(ProcessSession session, List<FlowFile> flowFiles);
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index 84a8752..d3f9485 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -79,6 +79,8 @@ import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.0"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.0 Producer API. "
@@ -289,6 +291,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(USE_TRANSACTIONS);
+ properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
properties.add(TRANSACTIONAL_ID_PREFIX);
properties.add(DELIVERY_GUARANTEE);
properties.add(ATTRIBUTE_NAME_REGEX);
@@ -445,6 +448,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+ final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -503,7 +507,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
if (publishResult.isFailure()) {
getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
return;
}
@@ -523,7 +527,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
} catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
lease.poison();
getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
}
@@ -557,4 +561,13 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
return accumulator.intValue();
}
+
+ private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+ final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+ if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+ return (session, flowFiles) -> session.rollback();
+ } else {
+ return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index f0e3f57..6b4b0a2 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -30,9 +30,9 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -64,6 +64,10 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.0"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.0 Producer API."
@@ -75,7 +79,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
- expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
+ expressionLanguageScope = VARIABLE_REGISTRY)
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+ "be greater than 1.")
@@ -107,48 +111,48 @@ public class PublishKafka_2_0 extends AbstractProcessor {
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
- static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor TOPIC = new Builder()
.name("topic")
.displayName("Topic Name")
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor DELIVERY_GUARANTEE = new Builder()
.name(ProducerConfig.ACKS_CONFIG)
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
- static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor METADATA_WAIT_TIME = new Builder()
.name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
.displayName("Max Metadata Wait Time")
.description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.defaultValue("5 sec")
.build();
- static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor ACK_WAIT_TIME = new Builder()
.name("ack.wait.time")
.displayName("Acknowledgment Wait Time")
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.defaultValue("5 secs")
.build();
- static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor MAX_REQUEST_SIZE = new Builder()
.name("max.request.size")
.displayName("Max Request Size")
.description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
@@ -157,7 +161,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.defaultValue("1 MB")
.build();
- static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor KEY = new Builder()
.name("kafka-key")
.displayName("Kafka Key")
.description("The Key to use for the Message. "
@@ -167,10 +171,10 @@ public class PublishKafka_2_0 extends AbstractProcessor {
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
@@ -179,19 +183,19 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.allowableValues(UTF8_ENCODING, HEX_ENCODING)
.build();
- static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor MESSAGE_DEMARCATOR = new Builder()
.name("message-demarcator")
.displayName("Message Demarcator")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+ "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+ "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
+ "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.")
.build();
- static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor PARTITION_CLASS = new Builder()
.name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
.displayName("Partitioner class")
.description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
@@ -200,7 +204,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.required(false)
.build();
- static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor PARTITION = new Builder()
.name("partition")
.displayName("Partition")
.description("Specifies which Partition Records will go to.")
@@ -209,7 +213,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();
- static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor COMPRESSION_CODEC = new Builder()
.name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
.displayName("Compression Type")
.description("This parameter allows you to specify the compression codec for all data generated by this producer.")
@@ -219,37 +223,37 @@ public class PublishKafka_2_0 extends AbstractProcessor {
.defaultValue("none")
.build();
- static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new Builder()
.name("attribute-name-regex")
.displayName("Attributes to Send as Headers (Regex)")
.description("A Regular Expression that is matched against all FlowFile attribute names. "
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(false)
.build();
- static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
.name("use-transactions")
.displayName("Use Transactions")
.description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, "
+ "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
+ "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
+ "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
- static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor TRANSACTIONAL_ID_PREFIX = new Builder()
.name("transactional-id-prefix")
.displayName("Transactional Id Prefix")
.description("When Use Transaction is set to true, KafkaProducer config 'transactional.id' will be a generated UUID and will be prefixed with this string.")
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.required(false)
.build();
- static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new Builder()
.name("message-header-encoding")
.displayName("Message Header Encoding")
.description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
@@ -279,6 +283,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
properties.add(TOPIC);
properties.add(DELIVERY_GUARANTEE);
+ properties.add(FAILURE_STRATEGY);
properties.add(USE_TRANSACTIONS);
properties.add(TRANSACTIONAL_ID_PREFIX);
properties.add(ATTRIBUTE_NAME_REGEX);
@@ -313,12 +318,12 @@ public class PublishKafka_2_0 extends AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
+ return new Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName)
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
.dynamic(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.build();
}
@@ -413,6 +418,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+ final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -460,7 +466,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
if (publishResult.isFailure()) {
getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
return;
}
@@ -480,12 +486,20 @@ public class PublishKafka_2_0 extends AbstractProcessor {
} catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
lease.poison();
getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
}
}
+ private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+ final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+ if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+ return (session, flowFiles) -> session.rollback();
+ } else {
+ return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+ }
+ }
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
@@ -518,5 +532,4 @@ public class PublishKafka_2_0 extends AbstractProcessor {
return null;
}
-
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index 1803451..dec9176 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -155,6 +155,25 @@ public class TestPublishKafkaRecord_2_0 {
}
@Test
+ public void testSingleFailureWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+ final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 0);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+ verify(mockLease, times(1)).close();
+
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+
+ @Test
public void testFailureWhenCreationgTransaction() {
runner.enqueue("John Doe, 48");
@@ -173,6 +192,26 @@ public class TestPublishKafkaRecord_2_0 {
}
@Test
+ public void testFailureWhenCreatingTransactionWithRollback() {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+ runner.enqueue("John Doe, 48");
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) {
+ throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+ }
+ }).when(mockLease).beginTransaction();
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 0);
+
+ verify(mockLease, times(1)).poison();
+ verify(mockLease, times(1)).close();
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleFailures() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("John Doe, 48"));
@@ -191,6 +230,27 @@ public class TestPublishKafkaRecord_2_0 {
}
@Test
+ public void testMultipleFailuresWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 0);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+
+ assertEquals(3, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleMessagesPerFlowFile() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index 82bd3fb..1cdc249 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -142,6 +142,22 @@ public class TestPublishKafka_2_0 {
}
@Test
+ public void testSingleFailureWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+ final MockFlowFile flowFile = runner.enqueue("hello world");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 0);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+ verify(mockLease, times(1)).close();
+
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleFailures() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("hello world"));
@@ -159,6 +175,26 @@ public class TestPublishKafka_2_0 {
}
@Test
+ public void testMultipleFailuresWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 0);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+ verify(mockLease, times(1)).close();
+
+ assertEquals(3, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleMessagesPerFlowFile() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue("hello world"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index bdddfad..9950273 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -95,6 +95,12 @@ public final class KafkaProcessorUtils {
static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " +
"The username and password properties must be set when using this mechanism.");
+ static final AllowableValue FAILURE_STRATEGY_FAILURE_RELATIONSHIP = new AllowableValue("Route to Failure", "Route to Failure",
+ "When unable to publish a FlowFile to Kafka, the FlowFile will be routed to the 'failure' relationship.");
+ static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("Rollback", "Rollback",
+ "When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
+ "For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
+
public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.displayName("Kafka Brokers")
@@ -192,6 +198,15 @@ public final class KafkaProcessorUtils {
.required(false)
.build();
+ static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
+ .name("Failure Strategy")
+ .displayName("Failure Strategy")
+ .description("Dictates how the processor handles a FlowFile if it is unable to publish the data to Kafka")
+ .required(true)
+ .allowableValues(FAILURE_STRATEGY_FAILURE_RELATIONSHIP, FAILURE_STRATEGY_ROLLBACK)
+ .defaultValue(FAILURE_STRATEGY_FAILURE_RELATIONSHIP.getValue())
+ .build();
+
static List<PropertyDescriptor> getCommonPropertyDescriptors() {
return Arrays.asList(
BOOTSTRAP_SERVERS,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
new file mode 100644
index 0000000..878d60f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishFailureStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+
+import java.util.List;
+
+/**
+ * Strategy for allowing multiple implementations of handling failure scenarios when publishing data to Kafka
+ */
+public interface PublishFailureStrategy {
+ /**
+ * Routes the FlowFiles to the appropriate destination
+ * @param session the process session that the flowfiles belong to
+ * @param flowFiles the flowfiles to transfer
+ */
+ void routeFlowFiles(ProcessSession session, List<FlowFile> flowFiles);
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index c701680..2400416 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -79,6 +79,8 @@ import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.5"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.5 Producer API. "
@@ -289,6 +291,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
properties.add(RECORD_READER);
properties.add(RECORD_WRITER);
properties.add(USE_TRANSACTIONS);
+ properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
properties.add(TRANSACTIONAL_ID_PREFIX);
properties.add(DELIVERY_GUARANTEE);
properties.add(ATTRIBUTE_NAME_REGEX);
@@ -445,6 +448,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+ final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -503,7 +507,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
if (publishResult.isFailure()) {
getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
return;
}
@@ -523,7 +527,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
} catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
lease.poison();
getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
}
@@ -557,4 +561,13 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor {
return accumulator.intValue();
}
+
+ private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+ final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+ if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+ return (session, flowFiles) -> session.rollback();
+ } else {
+ return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index 4d05df1..07602a3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -64,6 +64,8 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.5"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.5 Producer API."
@@ -279,6 +281,7 @@ public class PublishKafka_2_6 extends AbstractProcessor {
properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
properties.add(TOPIC);
properties.add(DELIVERY_GUARANTEE);
+ properties.add(FAILURE_STRATEGY);
properties.add(USE_TRANSACTIONS);
properties.add(TRANSACTIONAL_ID_PREFIX);
properties.add(ATTRIBUTE_NAME_REGEX);
@@ -413,6 +416,7 @@ public class PublishKafka_2_6 extends AbstractProcessor {
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+ final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -460,7 +464,7 @@ public class PublishKafka_2_6 extends AbstractProcessor {
if (publishResult.isFailure()) {
getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
return;
}
@@ -480,12 +484,20 @@ public class PublishKafka_2_6 extends AbstractProcessor {
} catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
lease.poison();
getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
- session.transfer(flowFiles, REL_FAILURE);
+ failureStrategy.routeFlowFiles(session, flowFiles);
context.yield();
}
}
}
+ private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
+ final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
+ if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+ return (session, flowFiles) -> session.rollback();
+ } else {
+ return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
+ }
+ }
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java
index 2624d38..aefc1c0 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_6.java
@@ -155,6 +155,25 @@ public class TestPublishKafkaRecord_2_6 {
}
@Test
+ public void testSingleFailureWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+ final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+ verify(mockLease, times(1)).close();
+
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+
+ @Test
public void testFailureWhenCreationgTransaction() {
runner.enqueue("John Doe, 48");
@@ -173,6 +192,26 @@ public class TestPublishKafkaRecord_2_6 {
}
@Test
+ public void testFailureWhenCreatingTransactionWithRollback() {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+ runner.enqueue("John Doe, 48");
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) {
+ throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+ }
+ }).when(mockLease).beginTransaction();
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
+
+ verify(mockLease, times(1)).poison();
+ verify(mockLease, times(1)).close();
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleFailures() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("John Doe, 48"));
@@ -191,6 +230,27 @@ public class TestPublishKafkaRecord_2_6 {
}
@Test
+ public void testMultipleFailuresWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+ flowFiles.add(runner.enqueue("John Doe, 48"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
+ AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(1)).close();
+
+ assertEquals(3, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleMessagesPerFlowFile() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java
index d2232a2..1f410a7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_6.java
@@ -142,6 +142,42 @@ public class TestPublishKafka_2_6 {
}
@Test
+ public void testSingleFailureWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+ final MockFlowFile flowFile = runner.enqueue("hello world");
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 0);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+ verify(mockLease, times(1)).close();
+
+ assertEquals(1, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
+ public void testMultipleFailuresWithRollback() throws IOException {
+ runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+
+ final Set<FlowFile> flowFiles = new HashSet<>();
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+ flowFiles.add(runner.enqueue("hello world"));
+
+ when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_FAILURE, 0);
+
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME), nullable(Integer.class));
+ verify(mockLease, times(1)).close();
+
+ assertEquals(3, runner.getQueueSize().getObjectCount());
+ }
+
+ @Test
public void testMultipleFailures() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("hello world"));