You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/06/12 23:04:06 UTC
[kafka] branch 1.0 updated: KAFKA-6634: Delay starting new
transaction in task.initializeTopology (#4684)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new b106a45 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
b106a45 is described below
commit b106a4521968cfe4a444b0e8171ab296500e8eb5
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700
KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology.
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>
---
.../streams/processor/internals/AssignedTasks.java | 9 +++--
.../streams/processor/internals/StreamTask.java | 41 ++++++++++++----------
.../streams/processor/internals/StreamThread.java | 4 +--
.../streams/processor/internals/TaskManager.java | 12 +------
.../processor/internals/AssignedTasksTest.java | 3 +-
.../processor/internals/StreamTaskTest.java | 7 ++++
6 files changed, 40 insertions(+), 36 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 6aa9c03..0d9d04d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -110,6 +110,7 @@ class AssignedTasks implements RestoringTasks {
* @return partitions that are ready to be resumed
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
Set<TopicPartition> initializeNewTasks() {
final Set<TopicPartition> readyPartitions = new HashSet<>();
@@ -259,14 +260,15 @@ class AssignedTasks implements RestoringTasks {
log.trace("found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
+ task.resume();
try {
- task.resume();
+ transitionToRunning(task, new HashSet<TopicPartition>());
} catch (final TaskMigratedException e) {
closeZombieTask(task);
suspended.remove(taskId);
+ running.remove(task.id());
throw e;
}
- transitionToRunning(task, new HashSet<TopicPartition>());
log.trace("resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
@@ -286,6 +288,9 @@ class AssignedTasks implements RestoringTasks {
}
}
+ /**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index c3f015d..df17f29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* @param metrics the {@link StreamsMetrics} created by the thread
* @param stateDirectory the {@link StateDirectory} created by the thread
* @param producer the instance of {@link Producer} used to produce records
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public StreamTask(final TaskId id,
final String applicationId,
@@ -146,14 +145,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
this.time = time;
stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+ // initialize transactions if eos is turned on, which will block if the previous transaction has not
+ // completed yet; do not start the first transaction until the topology has been initialized later
if (eosEnabled) {
- try {
- this.producer.initTransactions();
- this.producer.beginTransaction();
- } catch (final ProducerFencedException fatal) {
- throw new TaskMigratedException(this, fatal);
- }
- transactionInFlight = true;
+ this.producer.initTransactions();
}
}
@@ -164,31 +160,38 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return changelogPartitions().isEmpty();
}
+ /**
+ * <pre>
+ * - (re-)initialize the topology of the task
+ * </pre>
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
@Override
public void initializeTopology() {
initTopology();
+
+ if (eosEnabled) {
+ try {
+ this.producer.beginTransaction();
+ } catch (final ProducerFencedException fatal) {
+ throw new TaskMigratedException(this, fatal);
+ }
+ transactionInFlight = true;
+ }
+
processorContext.initialized();
taskInitialized = true;
}
/**
* <pre>
- * - re-initialize the task
- * - if (eos) begin new transaction
+ * - resume the task
* </pre>
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
@Override
public void resume() {
+ // nothing to do; new transaction will be started only after topology is initialized
log.debug("Resuming");
- if (eosEnabled) {
- try {
- producer.beginTransaction();
- } catch (final ProducerFencedException fatal) {
- throw new TaskMigratedException(this, fatal);
- }
- transactionInFlight = true;
- }
initTopology();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index eecf80d..c3f9312 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -344,6 +344,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
*/
Collection<Task> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
final List<Task> createdTasks = new ArrayList<>();
+
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
final TaskId taskId = newTaskAndPartitions.getKey();
final Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
@@ -394,9 +395,6 @@ public class StreamThread extends Thread implements ThreadDataProvider {
this.threadClientId = threadClientId;
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
@Override
StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
taskCreatedSensor.record();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 385729f..70f57c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -68,9 +68,6 @@ class TaskManager {
this.log = logContext.logger(getClass());
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
void createTasks(final Collection<TopicPartition> assignment) {
if (threadMetadataProvider == null) {
throw new IllegalStateException(logPrefix + "taskIdProvider has not been initialized while adding stream tasks. This should not happen.");
@@ -96,9 +93,6 @@ class TaskManager {
this.threadMetadataProvider = threadMetadataProvider;
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
private void addStreamTasks(final Collection<TopicPartition> assignment) {
Map<TaskId, Set<TopicPartition>> assignedTasks = threadMetadataProvider.activeTasks();
if (assignedTasks.isEmpty()) {
@@ -139,9 +133,6 @@ class TaskManager {
}
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
private void addStandbyTasks() {
final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = threadMetadataProvider.standbyTasks();
if (assignedStandbyTasks.isEmpty()) {
@@ -156,7 +147,6 @@ class TaskManager {
if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
newStandbyTasks.put(taskId, partitions);
}
-
}
if (newStandbyTasks.isEmpty()) {
@@ -254,7 +244,7 @@ class TaskManager {
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
- * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
+ * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
*/
boolean updateNewAndRestoringTasks() {
final Set<TopicPartition> resumed = active.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index 81a2ca3..5c8b7c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -256,9 +256,10 @@ public class AssignedTasksTest {
}
@Test
- public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+ public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
mockRunningTaskSuspension();
t1.resume();
+ t1.initializeTopology();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
t1.close(false, true);
EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3321da5..26b4e9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -898,6 +898,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
assertTrue(producer.transactionInitialized());
assertTrue(producer.transactionInFlight());
@@ -928,6 +929,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -944,6 +946,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.suspend();
assertTrue(producer.transactionCommitted());
@@ -971,6 +974,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -978,6 +982,7 @@ public class StreamTaskTest {
task.suspend();
task.resume();
+ task.initializeTopology();
assertTrue(producer.transactionInFlight());
}
@@ -1001,6 +1006,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -1029,6 +1035,7 @@ public class StreamTaskTest {
final MockProducer producer = new MockProducer();
task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
eosConfig, streamsMetrics, stateDirectory, null, time, producer);
+ task.initializeTopology();
task.close(false, false);
task = null;
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.