You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/11 21:40:24 UTC
[kafka] branch trunk updated: KAFKA-6906: Fixed to commit
transactions if data is produced via wall clock punctuation (#5105)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee5cc97 KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
ee5cc97 is described below
commit ee5cc974d2ef449444861d82e1793668184ca86f
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Tue Jun 12 03:10:03 2018 +0530
KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 19 +++++++-----
.../processor/internals/StreamTaskTest.java | 36 +++++++++++++++++++++-
.../processor/internals/StreamThreadTest.java | 4 +--
3 files changed, 49 insertions(+), 10 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index e2be3e2..4cea528 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -424,17 +424,22 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
- producer.commitTransaction();
- transactionInFlight = false;
- if (startNewTransaction) {
- producer.beginTransaction();
- transactionInFlight = true;
- }
} else {
consumer.commitSync(consumedOffsetsAndMetadata);
}
commitOffsetNeeded = false;
- } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
+ }
+
+ if (eosEnabled) {
+ producer.commitTransaction();
+ transactionInFlight = false;
+ if (startNewTransaction) {
+ producer.beginTransaction();
+ transactionInFlight = true;
+ }
+ }
+
+ if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
producer.commitTransaction();
transactionInFlight = false;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 5537335..bfbb2a0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -913,6 +914,7 @@ public class StreamTaskTest {
@Test
public void shouldCloseProducerOnCloseWhenEosEnabled() {
task = createStatelessTask(createConfig(true));
+ task.initializeTopology();
task.close(true, false);
task = null;
@@ -1028,6 +1030,39 @@ public class StreamTaskTest {
assertThat(map, equalTo(Collections.singletonMap(repartition, 11L)));
}
+ @Test
+ public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+ task = createStatelessTask(createConfig(true));
+ try {
+ task.close(true, false);
+ fail("should have throw IllegalStateException");
+ } catch (final IllegalStateException expected) {
+ // pass
+ }
+ task = null;
+
+ assertTrue(producer.closed());
+ }
+
+ @Test
+ public void shouldAlwaysCommitIfEosEnabled() {
+ final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "StreamTask",
+ new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
+
+ task = createStatelessTask(createConfig(true));
+ task.initializeStateStores();
+ task.initializeTopology();
+ task.punctuate(processorSystemTime, 5, PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
+ @Override
+ public void punctuate(final long timestamp) {
+ recordCollector.send("result-topic1", 3, 5, null, 0, time.milliseconds(),
+ new IntegerSerializer(), new IntegerSerializer());
+ }
+ });
+ task.commit();
+ assertEquals(1, producer.history().size());
+ }
+
private StreamTask createStatefulTask(final StreamsConfig config, final boolean logged) {
final ProcessorTopology topology = ProcessorTopology.with(
Utils.<ProcessorNode>mkList(source1, source2),
@@ -1144,5 +1179,4 @@ public class StreamTaskTest {
recordValue
);
}
-
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 936c67b..1cc9c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -660,7 +660,7 @@ public class StreamThreadTest {
new TestCondition() {
@Override
public boolean conditionMet() {
- return producer.commitCount() == 1;
+ return producer.commitCount() == 2;
}
},
"StreamsThread did not commit transaction.");
@@ -681,7 +681,7 @@ public class StreamThreadTest {
},
"StreamsThread did not remove fenced zombie task.");
- assertThat(producer.commitCount(), equalTo(1L));
+ assertThat(producer.commitCount(), equalTo(2L));
}
@Test
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.