You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/11 21:40:24 UTC

[kafka] branch trunk updated: KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ee5cc97  KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
ee5cc97 is described below

commit ee5cc974d2ef449444861d82e1793668184ca86f
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Tue Jun 12 03:10:03 2018 +0530

    KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 19 +++++++-----
 .../processor/internals/StreamTaskTest.java        | 36 +++++++++++++++++++++-
 .../processor/internals/StreamThreadTest.java      |  4 +--
 3 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e2be3e2..4cea528 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -424,17 +424,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
                 if (eosEnabled) {
                     producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                    producer.commitTransaction();
-                    transactionInFlight = false;
-                    if (startNewTransaction) {
-                        producer.beginTransaction();
-                        transactionInFlight = true;
-                    }
                 } else {
                     consumer.commitSync(consumedOffsetsAndMetadata);
                 }
                 commitOffsetNeeded = false;
-            } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
+            }
+
+            if (eosEnabled) {
+                producer.commitTransaction();
+                transactionInFlight = false;
+                if (startNewTransaction) {
+                    producer.beginTransaction();
+                    transactionInFlight = true;
+                }
+            }
+
+            if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
                 producer.commitTransaction();
                 transactionInFlight = false;
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 5537335..bfbb2a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -913,6 +914,7 @@ public class StreamTaskTest {
     @Test
     public void shouldCloseProducerOnCloseWhenEosEnabled() {
         task = createStatelessTask(createConfig(true));
+        task.initializeTopology();
         task.close(true, false);
         task = null;
 
@@ -1028,6 +1030,39 @@ public class StreamTaskTest {
         assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
     }
 
+    @Test
+    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+        task = createStatelessTask(createConfig(true));
+        try {
+            task.close(true, false);
+            fail("should have throw IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            // pass
+        }
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
+    public void shouldAlwaysCommitIfEosEnabled() {
+        final RecordCollectorImpl recordCollector =  new RecordCollectorImpl(producer, "StreamTask",
+                new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
+
+        task = createStatelessTask(createConfig(true));
+        task.initializeStateStores();
+        task.initializeTopology();
+        task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+            @Override
+            public void punctuate(final long timestamp) {
+                recordCollector.send("result-topic1", 3, 5, null, 0, time.milliseconds(),
+                        new IntegerSerializer(),  new IntegerSerializer());
+            }
+        });
+        task.commit();
+        assertEquals(1, producer.history().size());
+    }
+
     private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
         final ProcessorTopology topology = ProcessorTopology.with(
             Utils.<ProcessorNode>mkList(source1, source2),
@@ -1144,5 +1179,4 @@ public class StreamTaskTest {
             recordValue
         );
     }
-
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 936c67b..1cc9c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -660,7 +660,7 @@ public class StreamThreadTest {
             new TestCondition() {
                 @Override
                 public boolean conditionMet() {
-                    return producer.commitCount() == 1;
+                    return producer.commitCount() == 2;
                 }
             },
             "StreamsThread did not commit transaction.");
@@ -681,7 +681,7 @@ public class StreamThreadTest {
             },
             "StreamsThread did not remove fenced zombie task.");
 
-        assertThat(producer.commitCount(), equalTo(1L));
+        assertThat(producer.commitCount(), equalTo(2L));
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.