You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 02:13:59 UTC
[kafka] branch 0.11.0 updated: KAFKA-6906: Fixed to commit
transactions if data is produced via wall clock punctuation (#5105)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 51f585d KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
51f585d is described below
commit 51f585dee65c8489abf6675460a5abfd2c20d663
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Tue Jun 12 03:10:03 2018 +0530
KAFKA-6906: Fixed to commit transactions if data is produced via wall clock punctuation (#5105)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../streams/processor/internals/StreamTask.java | 14 ++---
.../processor/internals/StreamTaskTest.java | 72 +++++++++++++++++++++-
2 files changed, 77 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c2cc032..4b24aab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -321,12 +321,6 @@ public class StreamTask extends AbstractTask implements Punctuator {
if (eosEnabled) {
producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId);
- producer.commitTransaction();
- transactionInFlight = false;
- if (startNewTransaction) {
- producer.beginTransaction();
- transactionInFlight = true;
- }
} else {
try {
consumer.commitSync(consumedOffsetsAndMetadata);
@@ -336,9 +330,15 @@ public class StreamTask extends AbstractTask implements Punctuator {
}
}
commitOffsetNeeded = false;
- } else if (eosEnabled && !startNewTransaction && transactionInFlight) { // need to make sure to commit txn for suspend case
+ }
+
+ if (eosEnabled) {
producer.commitTransaction();
transactionInFlight = false;
+ if (startNewTransaction) {
+ producer.beginTransaction();
+ transactionInFlight = true;
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 0340913..ecf1044 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -154,7 +154,14 @@ public class StreamTaskTest {
public void cleanup() throws IOException {
try {
if (task != null) {
- task.close(true, false);
+ try {
+ task.close(true, false);
+ } catch (final IllegalStateException canHappen) {
+ if (!"There is no open transaction.".equals(canHappen.getMessage())) {
+ throw canHappen;
+ }
+ // swallow
+ }
}
} finally {
Utils.delete(baseDir);
@@ -566,6 +573,7 @@ public class StreamTaskTest {
};
}
};
+ streamTask.initializeTopology();
time.sleep(testConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
@@ -806,6 +814,7 @@ public class StreamTaskTest {
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
changelogReader, eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.close(true, false);
task = null;
@@ -878,6 +887,66 @@ public class StreamTaskTest {
assertFalse(task.initializeStateStores());
}
+ @Test
+ public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
+ final MockProducer producer = new MockProducer();
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+
+ try {
+ task.close(true, false);
+ fail("should have throw IllegalStateException");
+ } catch (final IllegalStateException expected) {
+ // pass
+ }
+ task = null;
+
+ assertTrue(producer.closed());
+ }
+
+ @Test
+ public void shouldAlwaysCommitIfEosEnabled() {
+ final MockProducer producer = new MockProducer();
+
+ final RecordCollectorImpl recordCollector = new RecordCollectorImpl(producer, "StreamTask");
+
+ final MockProcessorNode processor = new MockProcessorNode(10L) {
+ ProcessorContext context;
+
+ @Override
+ public void init(final ProcessorContext context) {
+ super.init(context);
+ this.context = context;
+ }
+
+ @Override
+ public void punctuate(final long timestamps) {
+ super.punctuate(timestamps);
+ recordCollector.send("result-topic1", 3, 5, 0, time.milliseconds(),
+ new IntegerSerializer(), new IntegerSerializer());
+ }
+ };
+ final ProcessorTopology topology = new ProcessorTopology(
+ Arrays.<ProcessorNode>asList(source1, source2, processor),
+ new HashMap<String, SourceNode>() {
+ {
+ put(topic1[0], source1);
+ put(topic2[0], source2);
+ }
+ },
+ Collections.<String, SinkNode>emptyMap(),
+ Collections.<StateStore>emptyList(),
+ Collections.<String, String>emptyMap(),
+ Collections.<StateStore>emptyList());
+
+ task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
+ eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeStateStores();
+ task.initializeTopology();
+ task.punctuate(processor, 5);
+ task.commit();
+ assertEquals(1, producer.history().size());
+ }
@SuppressWarnings("unchecked")
private StreamTask createTaskThatThrowsExceptionOnClose() {
@@ -908,5 +977,4 @@ public class StreamTaskTest {
private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
-
}
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.