You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 02:13:59 UTC

[kafka] branch 0.11.0 updated: KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 51f585d  KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
51f585d is described below

commit 51f585dee65c8489abf6675460a5abfd2c20d663
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Tue Jun 12 03:10:03 2018 +0530

    KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
 .../streams/processor/internals/StreamTask.java    | 14 ++---
 .../processor/internals/StreamTaskTest.java        | 72 +++++++++++++++++++++-
 2 files changed, 77 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c2cc032..4b24aab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -321,12 +321,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
 
             if (eosEnabled) {
                 producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
-                producer.commitTransaction();
-                transactionInFlight = false;
-                if (startNewTransaction) {
-                    producer.beginTransaction();
-                    transactionInFlight = true;
-                }
             } else {
                 try {
                     consumer.commitSync(consumedOffsetsAndMetadata);
@@ -336,9 +330,15 @@ public class StreamTask extends AbstractTask implements Punctuator {
                 }
             }
             commitOffsetNeeded = false;
-        } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
+        }
+
+        if (eosEnabled) {
             producer.commitTransaction();
             transactionInFlight = false;
+            if (startNewTransaction) {
+                producer.beginTransaction();
+                transactionInFlight = true;
+            }
         }
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0340913..ecf1044 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -154,7 +154,14 @@ public class StreamTaskTest {
     public void cleanup() throws IOException {
         try {
             if (task != null) {
-                task.close(true, false);
+                try {
+                    task.close(true, false);
+                } catch (final IllegalStateException canHappen) {
+                    if (!"There is no open transaction.".equals(canHappen.getMessage())) {
+                        throw canHappen;
+                    }
+                    // swallow
+                }
             }
         } finally {
             Utils.delete(baseDir);
@@ -566,6 +573,7 @@ public class StreamTaskTest {
                 };
             }
         };
+        streamTask.initializeTopology();
 
         time.sleep(testConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
 
@@ -806,6 +814,7 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
             changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeTopology();
 
         task.close(true, false);
         task = null;
@@ -878,6 +887,66 @@ public class StreamTaskTest {
         assertFalse(task.initializeStateStores());
     }
 
+    @Test
+    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+        final MockProducer producer = new MockProducer();
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+        try {
+            task.close(true, false);
+            fail("should have throw IllegalStateException");
+        } catch (final IllegalStateException expected) {
+            // pass
+        }
+        task = null;
+
+        assertTrue(producer.closed());
+    }
+
+    @Test
+    public void shouldAlwaysCommitIfEosEnabled() {
+        final MockProducer producer = new MockProducer();
+
+        final RecordCollectorImpl recordCollector =  new RecordCollectorImpl(producer, "StreamTask");
+
+        final MockProcessorNode processor = new MockProcessorNode(10L) {
+            ProcessorContext context;
+
+            @Override
+            public void init(final ProcessorContext context) {
+                super.init(context);
+                this.context = context;
+            }
+
+            @Override
+            public void punctuate(final long timestamps) {
+                super.punctuate(timestamps);
+                recordCollector.send("result-topic1", 3, 5, 0, time.milliseconds(),
+                    new IntegerSerializer(),  new IntegerSerializer());
+            }
+        };
+        final ProcessorTopology topology = new ProcessorTopology(
+            Arrays.<ProcessorNode>asList(source1, source2, processor),
+            new HashMap<String, SourceNode>() {
+                {
+                    put(topic1[0], source1);
+                    put(topic2[0], source2);
+                }
+            },
+            Collections.<String, SinkNode>emptyMap(),
+            Collections.<StateStore>emptyList(),
+            Collections.<String, String>emptyMap(),
+            Collections.<StateStore>emptyList());
+
+        task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+            eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+        task.initializeStateStores();
+        task.initializeTopology();
+        task.punctuate(processor, 5);
+        task.commit();
+        assertEquals(1, producer.history().size());
+    }
 
     @SuppressWarnings("unchecked")
     private StreamTask createTaskThatThrowsExceptionOnClose() {
@@ -908,5 +977,4 @@ public class StreamTaskTest {
     private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) {
         return Arrays.asList(recs);
     }
-
 }

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.