You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/20 13:34:53 UTC
[kafka] branch 2.1 updated: KAFKA-8290: Close producer for zombie
task (#6636)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 2fec972 KAFKA-8290: Close producer for zombie task (#6636)
2fec972 is described below
commit 2fec972803c03ce6d1e6208c4d4dcecb58e109ed
Author: Bill Bejeck <bb...@gmail.com>
AuthorDate: Mon May 20 09:02:25 2019 -0400
KAFKA-8290: Close producer for zombie task (#6636)
When we close a task and EOS is enabled we should always close the producer regardless if the task is in a zombie state (the broker fenced the producer) or not.
I've added tests that fail without this change.
Reviewers: Matthias J. Sax <mj...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 2 ++
.../processor/internals/StreamTaskTest.java | 35 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 67d457f..e78cd36 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -604,7 +604,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
// can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens
}
+ }
+ if (eosEnabled) {
try {
recordCollector.close();
} catch (final Throwable e) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index ac6f9d4..6aaa1f7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1093,6 +1093,19 @@ public class StreamTaskTest {
}
@Test
+ public void shouldOnlyCloseFencedProducerOnUncleanClosedWithEosEnabled() {
+ task = createStatelessTask(createConfig(true));
+ task.initializeTopology();
+ producer.fenceProducer();
+
+ task.close(false, true);
+ task = null;
+
+ assertFalse(producer.transactionAborted());
+ assertTrue(producer.closed());
+ }
+
+ @Test
public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
task = createStatelessTask(createConfig(true));
task.initializeTopology();
@@ -1147,7 +1160,7 @@ public class StreamTaskTest {
public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
task = createStatelessTask(createConfig(true));
- assertTrue(!producer.transactionInFlight());
+ assertFalse(producer.transactionInFlight());
task.close(false, false);
}
@@ -1305,6 +1318,26 @@ public class StreamTaskTest {
}
@Test
+ public void shouldCloseProducerOnUncleanCloseNotZombieWhenEosEnabled() {
+ task = createStatelessTask(createConfig(true));
+ task.initializeTopology();
+ task.close(false, false);
+ task = null;
+
+ assertTrue(producer.closed());
+ }
+
+ @Test
+ public void shouldCloseProducerOnUncleanCloseIsZombieWhenEosEnabled() {
+ task = createStatelessTask(createConfig(true));
+ task.initializeTopology();
+ task.close(false, true);
+ task = null;
+
+ assertTrue(producer.closed());
+ }
+
+ @Test
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
task = createTaskThatThrowsException(false);
task.initializeStateStores();