You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/05 22:41:43 UTC
[kafka] branch 1.1 updated: KAFKA-6747 Check whether there is
in-flight transaction before aborting transaction (#4826)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new fd35336 KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
fd35336 is described below
commit fd35336d28660e31255070e26f55a5477ced2a83
Author: tedyu <yu...@gmail.com>
AuthorDate: Thu Apr 5 15:29:04 2018 -0700
KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
As Frederic reported on mailing list under the subject "kafka-streams Invalid transition attempted from state READY to state ABORTING_TRANSACTION", producer#abortTransaction should only be called when transactionInFlight is true.
Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
.../org/apache/kafka/streams/processor/internals/StreamTask.java | 4 ++--
.../apache/kafka/streams/processor/internals/StreamTaskTest.java | 8 ++++++++
2 files changed, 10 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d04be04..a033043 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -358,8 +358,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
producer.commitTransaction();
transactionInFlight = false;
if (startNewTransaction) {
- transactionInFlight = true;
producer.beginTransaction();
+ transactionInFlight = true;
}
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
@@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
if (eosEnabled) {
if (!clean) {
try {
- if (!isZombie) {
+ if (!isZombie && transactionInFlight) {
producer.abortTransaction();
}
transactionInFlight = false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a305829..d6a5276 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -783,6 +783,14 @@ public class StreamTaskTest {
}
@Test
+ public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
+ task = createStatelessTask(true);
+
+ assertTrue(!producer.transactionInFlight());
+ task.close(false, false);
+ }
+
+ @Test
public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
task = createStatelessTask(false);
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.