You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/05 22:41:43 UTC

[kafka] branch 1.1 updated: KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new fd35336  KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
fd35336 is described below

commit fd35336d28660e31255070e26f55a5477ced2a83
Author: tedyu <yu...@gmail.com>
AuthorDate: Thu Apr 5 15:29:04 2018 -0700

    KAFKA-6747 Check whether there is in-flight transaction before aborting transaction (#4826)
    
    As Frederic reported on mailing list under the subject "kafka-streams Invalid transition attempted from state READY to state ABORTING_TRANSACTION", producer#abortTransaction should only be called when transactionInFlight is true.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, Matthias J. Sax <ma...@confluent.io>
---
 .../org/apache/kafka/streams/processor/internals/StreamTask.java  | 4 ++--
 .../apache/kafka/streams/processor/internals/StreamTaskTest.java  | 8 ++++++++
 2 files changed, 10 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d04be04..a033043 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -358,8 +358,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
                     producer.commitTransaction();
                     transactionInFlight = false;
                     if (startNewTransaction) {
-                        transactionInFlight = true;
                         producer.beginTransaction();
+                        transactionInFlight = true;
                     }
                 } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
@@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
             if (eosEnabled) {
                 if (!clean) {
                     try {
-                        if (!isZombie) {
+                        if (!isZombie && transactionInFlight) {
                             producer.abortTransaction();
                         }
                         transactionInFlight = false;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a305829..d6a5276 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -783,6 +783,14 @@ public class StreamTaskTest {
     }
 
     @Test
+    public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
+        task = createStatelessTask(true);
+
+        assertTrue(!producer.transactionInFlight());
+        task.close(false, false);
+    }
+
+    @Test
     public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
         task = createStatelessTask(false);
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.