You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/13 00:55:23 UTC
[kafka] branch 0.11.0 updated: KAFKA-6634: Delay starting new
transaction in task.initializeTopology (#4684)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/0.11.0 by this push:
new 341fd7b KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
341fd7b is described below
commit 341fd7b533447d2ee73867c179403bd16b3de094
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700
KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology.
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>
---
.../streams/processor/internals/AssignedTasks.java | 9 +++++++-
.../streams/processor/internals/StreamTask.java | 24 ++++++++++++++--------
.../processor/internals/StreamTaskTest.java | 7 +++++++
3 files changed, 31 insertions(+), 9 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index fc82c6e..d59ec2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -226,7 +226,14 @@ class AssignedTasks<T extends AbstractTask> {
suspended.remove(taskId);
log.trace("{} Resuming suspended {} {} with assigned partitions {}", logPrefix, taskTypeName, taskId, partitions);
task.resume();
- transitionToRunning(task);
+ try {
+ transitionToRunning(task);
+ } catch (final ProducerFencedException e) {
+ closeZombieTask(task);
+ suspended.remove(taskId);
+ running.remove(task.id());
+ throw e;
+ }
return true;
} else {
log.trace("{} couldn't resume task {} assigned partitions {}, task partitions", logPrefix, taskId, partitions, task.partitions);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index b628985..c2cc032 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -139,10 +139,11 @@ public class StreamTask extends AbstractTask implements Punctuator {
this.time = time;
stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+ // initialize transactions if eos is turned on, which will block if the previous transaction has not
+ // completed yet; do not start the first transaction until the topology has been initialized later
if (eosEnabled) {
this.producer.initTransactions();
- this.producer.beginTransaction();
- transactionInFlight = true;
}
}
@@ -153,26 +154,33 @@ public class StreamTask extends AbstractTask implements Punctuator {
return changelogPartitions().isEmpty();
}
+ /**
+ * <pre>
+ * - (re-)initialize the topology of the task
+ * </pre>
+ * @throws ProducerFencedException
+ */
@Override
public void initializeTopology() {
initTopology();
+
+ if (eosEnabled) {
+ this.producer.beginTransaction();
+ transactionInFlight = true;
+ }
+
processorContext.initialized();
taskInitialized = true;
}
/**
* <pre>
- * - re-initialize the task
- * - if (eos) begin new transaction
+ * - resume the task
* </pre>
*/
@Override
public void resume() {
log.debug("{} Resuming", logPrefix);
- if (eosEnabled) {
- producer.beginTransaction();
- transactionInFlight = true;
- }
initTopology();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 25de93c..0340913 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -635,6 +635,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
assertTrue(producer.transactionInitialized());
assertTrue(producer.transactionInFlight());
@@ -664,6 +665,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -680,6 +682,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.suspend();
assertTrue(producer.transactionCommitted());
@@ -707,6 +710,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -714,6 +718,7 @@ public class StreamTaskTest {
task.suspend();
task.resume();
+ task.initializeTopology();
assertTrue(producer.transactionInFlight());
}
@@ -737,6 +742,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -765,6 +771,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.close(false, false);
task = null;
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.