You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/01/06 16:35:15 UTC
[nifi] branch master updated: NIFI-6983: Ensure that if any call to
kafka's Producer throws a ProducerFencedException that it's handled
properly by poisoning the lease, which in turn will close the client
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new b03e5b0 NIFI-6983: Ensure that if any call to kafka's Producer throws a ProducerFencedException that it's handled properly by poisoning the lease, which in turn will close the client
b03e5b0 is described below
commit b03e5b0520230d90d112a1e7958fc17adbed3834
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jan 6 10:46:58 2020 -0500
NIFI-6983: Ensure that if any call to kafka's Producer throws a ProducerFencedException that it's handled properly by poisoning the lease, which in turn will close the client
This closes #6983.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../processors/kafka/pubsub/PublisherLease.java | 61 ++++++----
.../processors/kafka/pubsub/PublisherLease.java | 38 +++++--
.../kafka/pubsub/TestPublisherLease.java | 104 +++++++++--------
.../kafka/pubsub/PublishKafkaRecord_2_0.java | 124 +++++++++++----------
.../processors/kafka/pubsub/PublishKafka_2_0.java | 108 ++++++++++--------
.../processors/kafka/pubsub/PublisherLease.java | 38 +++++--
.../kafka/pubsub/TestPublishKafkaRecord_2_0.java | 19 ++++
.../kafka/pubsub/TestPublishKafka_2_0.java | 24 +++-
.../kafka/pubsub/TestPublisherLease.java | 103 +++++++++--------
9 files changed, 375 insertions(+), 244 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 0165210..150c7b4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -17,19 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -48,6 +35,19 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import org.apache.nifi.stream.io.util.StreamDemarcator;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
public class PublisherLease implements Closeable {
private final ComponentLog logger;
private final Producer<byte[], byte[]> producer;
@@ -88,12 +88,18 @@ public class PublisherLease implements Closeable {
return;
}
- if (!transactionsInitialized) {
- producer.initTransactions();
- transactionsInitialized = true;
+ try {
+ if (!transactionsInitialized) {
+ producer.initTransactions();
+ transactionsInitialized = true;
+ }
+
+ producer.beginTransaction();
+ } catch (final Exception e) {
+ poison();
+ throw e;
}
- producer.beginTransaction();
activeTransaction = true;
}
@@ -102,7 +108,13 @@ public class PublisherLease implements Closeable {
return;
}
- producer.abortTransaction();
+ try {
+ producer.abortTransaction();
+ } catch (final Exception e) {
+ poison();
+ throw e;
+ }
+
activeTransaction = false;
}
@@ -255,11 +267,16 @@ public class PublisherLease implements Closeable {
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
- producer.flush();
+ try {
+ producer.flush();
- if (activeTransaction) {
- producer.commitTransaction();
- activeTransaction = false;
+ if (activeTransaction) {
+ producer.commitTransaction();
+ activeTransaction = false;
+ }
+ } catch (final Exception e) {
+ poison();
+ throw e;
}
try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index f8587d9..4911409 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -89,13 +89,18 @@ public class PublisherLease implements Closeable {
return;
}
- if (!transactionsInitialized) {
- producer.initTransactions();
- transactionsInitialized = true;
- }
+ try {
+ if (!transactionsInitialized) {
+ producer.initTransactions();
+ transactionsInitialized = true;
+ }
- producer.beginTransaction();
- activeTransaction = true;
+ producer.beginTransaction();
+ activeTransaction = true;
+ } catch (final Exception e) {
+ poison();
+ throw e;
+ }
}
void rollback() {
@@ -103,7 +108,13 @@ public class PublisherLease implements Closeable {
return;
}
- producer.abortTransaction();
+ try {
+ producer.abortTransaction();
+ } catch (final Exception e) {
+ poison();
+ throw e;
+ }
+
activeTransaction = false;
}
@@ -257,11 +268,16 @@ public class PublisherLease implements Closeable {
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
- producer.flush();
+ try {
+ producer.flush();
- if (activeTransaction) {
- producer.commitTransaction();
- activeTransaction = false;
+ if (activeTransaction) {
+ producer.commitTransaction();
+ activeTransaction = false;
+ }
+ } catch (final Exception e) {
+ poison();
+ throw e;
}
try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 156af52..16c825a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -56,7 +57,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-
public class TestPublisherLease {
private ComponentLog logger;
private Producer<byte[], byte[]> producer;
@@ -69,16 +69,8 @@ public class TestPublisherLease {
}
@Test
- public void testPoisonOnException() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
- @Override
- public void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
+ public void testPoisonOnException() {
+ final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
@@ -101,25 +93,41 @@ public class TestPublisherLease {
// expected
}
- assertEquals(1, poisonCount.get());
+ assertEquals(1, lease.getPoisonCount());
final PublishResult result = lease.complete();
assertTrue(result.isFailure());
}
@Test
- @SuppressWarnings("unchecked")
- public void testPoisonOnFailure() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
+ public void testPoisonOnExceptionCreatingTransaction() {
+ final PoisonCountingLease lease = new PoisonCountingLease();
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
+ final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+ // Need a size grater than zero to make the lease reads the InputStream.
+ Mockito.when(flowFile.getSize()).thenReturn(1L);
+ doAnswer(new Answer<Object>() {
@Override
- public void poison() {
- poisonCount.incrementAndGet();
- super.poison();
+ public Object answer(final InvocationOnMock invocationOnMock) {
+ throw new ProducerFencedException("Intenitional exception thrown from unit test");
}
- };
+ }).when(producer).beginTransaction();
+
+ try {
+ lease.beginTransaction();
+ Assert.fail("Expected ProducerFencedException");
+ } catch (final ProducerFencedException pfe) {
+ // expected
+ }
+
+ assertEquals(1, lease.getPoisonCount());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPoisonOnFailure() throws IOException {
+ final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = new MockFlowFile(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
@@ -127,7 +135,7 @@ public class TestPublisherLease {
doAnswer(new Answer<Object>() {
@Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
+ public Object answer(final InvocationOnMock invocation) {
final Callback callback = invocation.getArgument(1);
callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
return null;
@@ -136,7 +144,7 @@ public class TestPublisherLease {
lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic, null);
- assertEquals(1, poisonCount.get());
+ assertEquals(1, lease.getPoisonCount());
final PublishResult result = lease.complete();
assertTrue(result.isFailure());
@@ -145,21 +153,12 @@ public class TestPublisherLease {
@Test
@SuppressWarnings("unchecked")
public void testAllDelimitedMessagesSent() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
- @Override
- protected void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
-
+ final PoisonCountingLease lease = new PoisonCountingLease();
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
+ public Object answer(InvocationOnMock invocation) {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -190,7 +189,7 @@ public class TestPublisherLease {
final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic, null);
- assertEquals(0, poisonCount.get());
+ assertEquals(0, lease.getPoisonCount());
verify(producer, times(0)).flush();
@@ -206,21 +205,13 @@ public class TestPublisherLease {
@Test
@SuppressWarnings("unchecked")
public void testZeroByteMessageSent() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
- @Override
- protected void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
+ final PoisonCountingLease lease = new PoisonCountingLease();
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
+ public Object answer(InvocationOnMock invocation) {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -242,11 +233,11 @@ public class TestPublisherLease {
final byte[] flowFileContent = new byte[0];
lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic, null);
- assertEquals(0, poisonCount.get());
+ assertEquals(0, lease.getPoisonCount());
verify(producer, times(0)).flush();
- final PublishResult result = lease.complete();
+ lease.complete();
assertEquals(1, correctMessages.get());
assertEquals(0, incorrectMessages.get());
@@ -256,7 +247,7 @@ public class TestPublisherLease {
@Test
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
+ final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = new MockFlowFile(1L);
final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
@@ -284,5 +275,24 @@ public class TestPublisherLease {
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
+ assertEquals(0, lease.getPoisonCount());
+ }
+
+ private class PoisonCountingLease extends PublisherLease {
+ private final AtomicInteger poisonCount = new AtomicInteger(0);
+
+ public PoisonCountingLease() {
+ super(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void poison() {
+ poisonCount.incrementAndGet();
+ super.poison();
+ }
+
+ public int getPoisonCount() {
+ return poisonCount.get();
+ }
}
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index fe502b3..84a8752 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -18,6 +18,9 @@
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -445,76 +448,83 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
- if (useTransactions) {
- lease.beginTransaction();
- }
+ try {
+ if (useTransactions) {
+ lease.beginTransaction();
+ }
- // Send each FlowFile to Kafka asynchronously.
- final Iterator<FlowFile> itr = flowFiles.iterator();
- while (itr.hasNext()) {
- final FlowFile flowFile = itr.next();
-
- if (!isScheduled()) {
- // If stopped, re-queue FlowFile instead of sending it
- if (useTransactions) {
- session.rollback();
- lease.rollback();
- return;
- }
+ // Send each FlowFile to Kafka asynchronously.
+ final Iterator<FlowFile> itr = flowFiles.iterator();
+ while (itr.hasNext()) {
+ final FlowFile flowFile = itr.next();
+
+ if (!isScheduled()) {
+ // If stopped, re-queue FlowFile instead of sending it
+ if (useTransactions) {
+ session.rollback();
+ lease.rollback();
+ return;
+ }
- session.transfer(flowFile);
- itr.remove();
- continue;
- }
+ session.transfer(flowFile);
+ itr.remove();
+ continue;
+ }
- final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
- final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);
+ final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);
- try {
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- try {
- final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
- final RecordSet recordSet = reader.createRecordSet();
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ try {
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+ final RecordSet recordSet = reader.createRecordSet();
- final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
- lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
- } catch (final SchemaNotFoundException | MalformedRecordException e) {
- throw new ProcessException(e);
+ final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
+ lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
+ } catch (final SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException(e);
+ }
}
- }
- });
- } catch (final Exception e) {
- // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
- lease.fail(flowFile, e);
- continue;
+ });
+ } catch (final Exception e) {
+ // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
+ lease.fail(flowFile, e);
+ continue;
+ }
}
- }
- // Complete the send
- final PublishResult publishResult = lease.complete();
+ // Complete the send
+ final PublishResult publishResult = lease.complete();
- if (publishResult.isFailure()) {
- getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
- session.transfer(flowFiles, REL_FAILURE);
- return;
- }
+ if (publishResult.isFailure()) {
+ getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+ session.transfer(flowFiles, REL_FAILURE);
+ return;
+ }
- // Transfer any successful FlowFiles.
- final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
- for (FlowFile success : flowFiles) {
- final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+ // Transfer any successful FlowFiles.
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ for (FlowFile success : flowFiles) {
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
- final int msgCount = publishResult.getSuccessfulMessageCount(success);
- success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
- session.adjustCounter("Messages Sent", msgCount, true);
+ final int msgCount = publishResult.getSuccessfulMessageCount(success);
+ success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+ session.adjustCounter("Messages Sent", msgCount, true);
- final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
- session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
- session.transfer(success, REL_SUCCESS);
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+ session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+ session.transfer(success, REL_SUCCESS);
+ }
+ } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+ lease.poison();
+ getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
+ session.transfer(flowFiles, REL_FAILURE);
+ context.yield();
}
}
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index a42860b..f0e3f57 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -18,6 +18,9 @@
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -413,65 +416,72 @@ public class PublishKafka_2_0 extends AbstractProcessor {
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
- if (useTransactions) {
- lease.beginTransaction();
- }
+ try {
+ if (useTransactions) {
+ lease.beginTransaction();
+ }
- // Send each FlowFile to Kafka asynchronously.
- for (final FlowFile flowFile : flowFiles) {
- if (!isScheduled()) {
- // If stopped, re-queue FlowFile instead of sending it
- if (useTransactions) {
- session.rollback();
- lease.rollback();
- return;
- }
+ // Send each FlowFile to Kafka asynchronously.
+ for (final FlowFile flowFile : flowFiles) {
+ if (!isScheduled()) {
+ // If stopped, re-queue FlowFile instead of sending it
+ if (useTransactions) {
+ session.rollback();
+ lease.rollback();
+ return;
+ }
- session.transfer(flowFile);
- continue;
- }
+ session.transfer(flowFile);
+ continue;
+ }
- final byte[] messageKey = getMessageKey(flowFile, context);
- final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
- final byte[] demarcatorBytes;
- if (useDemarcator) {
- demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
- } else {
- demarcatorBytes = null;
- }
+ final byte[] messageKey = getMessageKey(flowFile, context);
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final byte[] demarcatorBytes;
+ if (useDemarcator) {
+ demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+ } else {
+ demarcatorBytes = null;
+ }
- final Integer partition = getPartition(context, flowFile);
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream rawIn) throws IOException {
- try (final InputStream in = new BufferedInputStream(rawIn)) {
- lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
+ final Integer partition = getPartition(context, flowFile);
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ try (final InputStream in = new BufferedInputStream(rawIn)) {
+ lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
+ }
}
- }
- });
- }
+ });
+ }
- // Complete the send
- final PublishResult publishResult = lease.complete();
+ // Complete the send
+ final PublishResult publishResult = lease.complete();
- if (publishResult.isFailure()) {
- getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
- session.transfer(flowFiles, REL_FAILURE);
- return;
- }
+ if (publishResult.isFailure()) {
+ getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+ session.transfer(flowFiles, REL_FAILURE);
+ return;
+ }
- // Transfer any successful FlowFiles.
- final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
- for (FlowFile success : flowFiles) {
- final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+ // Transfer any successful FlowFiles.
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ for (FlowFile success : flowFiles) {
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
- final int msgCount = publishResult.getSuccessfulMessageCount(success);
- success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
- session.adjustCounter("Messages Sent", msgCount, true);
+ final int msgCount = publishResult.getSuccessfulMessageCount(success);
+ success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+ session.adjustCounter("Messages Sent", msgCount, true);
- final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
- session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
- session.transfer(success, REL_SUCCESS);
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+ session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+ session.transfer(success, REL_SUCCESS);
+ }
+ } catch (final ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+ lease.poison();
+ getLogger().error("Failed to send messages to Kafka; will yield Processor and transfer FlowFiles to failure");
+ session.transfer(flowFiles, REL_FAILURE);
+ context.yield();
}
}
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index f8587d9..4911409 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -89,13 +89,18 @@ public class PublisherLease implements Closeable {
return;
}
- if (!transactionsInitialized) {
- producer.initTransactions();
- transactionsInitialized = true;
- }
+ try {
+ if (!transactionsInitialized) {
+ producer.initTransactions();
+ transactionsInitialized = true;
+ }
- producer.beginTransaction();
- activeTransaction = true;
+ producer.beginTransaction();
+ activeTransaction = true;
+ } catch (final Exception e) {
+ poison();
+ throw e;
+ }
}
void rollback() {
@@ -103,7 +108,13 @@ public class PublisherLease implements Closeable {
return;
}
- producer.abortTransaction();
+ try {
+ producer.abortTransaction();
+ } catch (final Exception e) {
+ poison();
+ throw e;
+ }
+
activeTransaction = false;
}
@@ -257,11 +268,16 @@ public class PublisherLease implements Closeable {
throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
}
- producer.flush();
+ try {
+ producer.flush();
- if (activeTransaction) {
- producer.commitTransaction();
- activeTransaction = false;
+ if (activeTransaction) {
+ producer.commitTransaction();
+ activeTransaction = false;
+ }
+ } catch (final Exception e) {
+ poison();
+ throw e;
}
try {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index e7b3c15..1803451 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -154,6 +155,24 @@ public class TestPublishKafkaRecord_2_0 {
}
@Test
+ public void testFailureWhenCreationgTransaction() {
+ runner.enqueue("John Doe, 48");
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) {
+ throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+ }
+ }).when(mockLease).beginTransaction();
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_0.REL_FAILURE, 1);
+
+ verify(mockLease, times(1)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
public void testMultipleFailures() throws IOException {
final Set<FlowFile> flowFiles = new HashSet<>();
flowFiles.add(runner.enqueue("John Doe, 48"));
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index 14fb5d4..82bd3fb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
@@ -24,6 +25,8 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.io.InputStream;
@@ -41,6 +44,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -93,7 +97,6 @@ public class TestPublishKafka_2_0 {
flowFiles.add(runner.enqueue("hello world"));
flowFiles.add(runner.enqueue("hello world"));
-
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 1));
runner.run();
@@ -106,6 +109,25 @@ public class TestPublishKafka_2_0 {
}
@Test
+ public void testPublisherPoisonedIfFencedDuringTransactionCreation() {
+ runner.enqueue("hello world");
+ runner.enqueue("Hello World");
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(final InvocationOnMock invocationOnMock) {
+ throw new ProducerFencedException("Intentional ProducedFencedException for unit test");
+ }
+ }).when(mockLease).beginTransaction();
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_0.REL_FAILURE, 2);
+
+ verify(mockLease, times(1)).poison();
+ verify(mockLease, times(1)).close();
+ }
+
+ @Test
public void testSingleFailure() throws IOException {
final MockFlowFile flowFile = runner.enqueue("hello world");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 60d157c..16c825a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.kafka.pubsub;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -68,16 +69,8 @@ public class TestPublisherLease {
}
@Test
- public void testPoisonOnException() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
- @Override
- public void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
+ public void testPoisonOnException() {
+ final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
// Need a size grater than zero to make the lease reads the InputStream.
@@ -100,25 +93,41 @@ public class TestPublisherLease {
// expected
}
- assertEquals(1, poisonCount.get());
+ assertEquals(1, lease.getPoisonCount());
final PublishResult result = lease.complete();
assertTrue(result.isFailure());
}
@Test
- @SuppressWarnings("unchecked")
- public void testPoisonOnFailure() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
+ public void testPoisonOnExceptionCreatingTransaction() {
+ final PoisonCountingLease lease = new PoisonCountingLease();
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8) {
+ final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+ // Need a size grater than zero to make the lease reads the InputStream.
+ Mockito.when(flowFile.getSize()).thenReturn(1L);
+ doAnswer(new Answer<Object>() {
@Override
- public void poison() {
- poisonCount.incrementAndGet();
- super.poison();
+ public Object answer(final InvocationOnMock invocationOnMock) {
+ throw new ProducerFencedException("Intenitional exception thrown from unit test");
}
- };
+ }).when(producer).beginTransaction();
+
+ try {
+ lease.beginTransaction();
+ Assert.fail("Expected ProducerFencedException");
+ } catch (final ProducerFencedException pfe) {
+ // expected
+ }
+ assertEquals(1, lease.getPoisonCount());
+ }
+
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPoisonOnFailure() throws IOException {
+ final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = new MockFlowFile(1L);
final String topic = "unit-test";
final byte[] messageKey = null;
@@ -126,7 +135,7 @@ public class TestPublisherLease {
doAnswer(new Answer<Object>() {
@Override
- public Object answer(final InvocationOnMock invocation) throws Throwable {
+ public Object answer(final InvocationOnMock invocation) {
final Callback callback = invocation.getArgument(1);
callback.onCompletion(null, new RuntimeException("Unit Test Intentional Exception"));
return null;
@@ -135,7 +144,7 @@ public class TestPublisherLease {
lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), messageKey, demarcatorBytes, topic, null);
- assertEquals(1, poisonCount.get());
+ assertEquals(1, lease.getPoisonCount());
final PublishResult result = lease.complete();
assertTrue(result.isFailure());
@@ -144,21 +153,12 @@ public class TestPublisherLease {
@Test
@SuppressWarnings("unchecked")
public void testAllDelimitedMessagesSent() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
- @Override
- protected void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
-
+ final PoisonCountingLease lease = new PoisonCountingLease();
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
+ public Object answer(InvocationOnMock invocation) {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -189,7 +189,7 @@ public class TestPublisherLease {
final byte[] flowFileContent4 = "\n\n\n".getBytes(StandardCharsets.UTF_8);
lease.publish(new MockFlowFile(4L), new ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic, null);
- assertEquals(0, poisonCount.get());
+ assertEquals(0, lease.getPoisonCount());
verify(producer, times(0)).flush();
@@ -205,21 +205,13 @@ public class TestPublisherLease {
@Test
@SuppressWarnings("unchecked")
public void testZeroByteMessageSent() throws IOException {
- final AtomicInteger poisonCount = new AtomicInteger(0);
-
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
- @Override
- protected void poison() {
- poisonCount.incrementAndGet();
- super.poison();
- }
- };
+ final PoisonCountingLease lease = new PoisonCountingLease();
final AtomicInteger correctMessages = new AtomicInteger(0);
final AtomicInteger incorrectMessages = new AtomicInteger(0);
doAnswer(new Answer<Object>() {
@Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
+ public Object answer(InvocationOnMock invocation) {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
final byte[] value = record.value();
final String valueString = new String(value, StandardCharsets.UTF_8);
@@ -241,11 +233,11 @@ public class TestPublisherLease {
final byte[] flowFileContent = new byte[0];
lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic, null);
- assertEquals(0, poisonCount.get());
+ assertEquals(0, lease.getPoisonCount());
verify(producer, times(0)).flush();
- final PublishResult result = lease.complete();
+ lease.complete();
assertEquals(1, correctMessages.get());
assertEquals(0, incorrectMessages.get());
@@ -255,7 +247,7 @@ public class TestPublisherLease {
@Test
public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
- final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
+ final PoisonCountingLease lease = new PoisonCountingLease();
final FlowFile flowFile = new MockFlowFile(1L);
final byte[] exampleInput = "101, John Doe, 48\n102, Jane Doe, 47".getBytes(StandardCharsets.UTF_8);
@@ -283,5 +275,24 @@ public class TestPublisherLease {
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
verify(producer, times(2)).send(any(), any());
+ assertEquals(0, lease.getPoisonCount());
+ }
+
+ private class PoisonCountingLease extends PublisherLease {
+ private final AtomicInteger poisonCount = new AtomicInteger(0);
+
+ public PoisonCountingLease() {
+ super(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void poison() {
+ poisonCount.incrementAndGet();
+ super.poison();
+ }
+
+ public int getPoisonCount() {
+ return poisonCount.get();
+ }
}
}