You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/03/13 15:46:15 UTC
[kafka] branch 1.1 updated: KAFKA-6634: Delay starting new
transaction in task.initializeTopology (#4684)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new c7bdec7 KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
c7bdec7 is described below
commit c7bdec74bad366f485d055a68e910dd55cc65728
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Tue Mar 13 08:43:58 2018 -0700
KAFKA-6634: Delay starting new transaction in task.initializeTopology (#4684)
As titled, not starting new transaction since during restoration producer would have not activity and hence may cause txn expiration. Also delay starting new txn in resuming until initializing topology.
Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>
---
.../streams/processor/internals/AssignedTasks.java | 13 +++++--
.../streams/processor/internals/StreamTask.java | 41 ++++++++++++----------
.../streams/processor/internals/StreamThread.java | 6 ----
.../streams/processor/internals/TaskManager.java | 12 +------
.../internals/AssignedStreamsTasksTest.java | 3 +-
.../processor/internals/StreamTaskTest.java | 7 ++++
6 files changed, 43 insertions(+), 39 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2cd82f4..029f745 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -90,6 +90,7 @@ abstract class AssignedTasks<T extends Task> {
* @return partitions that are ready to be resumed
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
Set<TopicPartition> initializeNewTasks() {
final Set<TopicPartition> readyPartitions = new HashSet<>();
@@ -239,17 +240,22 @@ abstract class AssignedTasks<T extends Task> {
log.trace("found suspended {} {}", taskTypeName, taskId);
if (task.partitions().equals(partitions)) {
suspended.remove(taskId);
+ task.resume();
try {
- task.resume();
+ transitionToRunning(task, new HashSet<TopicPartition>());
} catch (final TaskMigratedException e) {
+ // we need to catch migration exception internally since this function
+ // is triggered in the rebalance callback
+ log.info("Failed to resume {} {} since it got migrated to another thread already. " +
+ "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id());
final RuntimeException fatalException = closeZombieTask(task);
+ running.remove(task.id());
if (fatalException != null) {
throw fatalException;
}
suspended.remove(taskId);
throw e;
}
- transitionToRunning(task, new HashSet<TopicPartition>());
log.trace("resuming suspended {} {}", taskTypeName, task.id());
return true;
} else {
@@ -269,6 +275,9 @@ abstract class AssignedTasks<T extends Task> {
}
}
+ /**
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6bca02a..d04be04 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -100,7 +100,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
* @param cache the {@link ThreadCache} created by the thread
* @param time the system {@link Time} of the thread
* @param producer the instance of {@link Producer} used to produce records
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
public StreamTask(final TaskId id,
final Collection<TopicPartition> partitions,
@@ -149,14 +148,11 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
partitionGroup = new PartitionGroup(partitionQueues);
stateMgr.registerGlobalStateStores(topology.globalStateStores());
+
+ // initialize transactions if eos is turned on, which will block if the previous transaction has not
+ // completed yet; do not start the first transaction until the topology has been initialized later
if (eosEnabled) {
- try {
- this.producer.initTransactions();
- this.producer.beginTransaction();
- } catch (final ProducerFencedException fatal) {
- throw new TaskMigratedException(this, fatal);
- }
- transactionInFlight = true;
+ this.producer.initTransactions();
}
}
@@ -167,31 +163,38 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
return changelogPartitions().isEmpty();
}
+ /**
+ * <pre>
+ * - (re-)initialize the topology of the task
+ * </pre>
+ * @throws TaskMigratedException if the task producer got fenced (EOS only)
+ */
@Override
public void initializeTopology() {
initTopology();
+
+ if (eosEnabled) {
+ try {
+ this.producer.beginTransaction();
+ } catch (final ProducerFencedException fatal) {
+ throw new TaskMigratedException(this, fatal);
+ }
+ transactionInFlight = true;
+ }
+
processorContext.initialized();
taskInitialized = true;
}
/**
* <pre>
- * - re-initialize the task
- * - if (eos) begin new transaction
+ * - resume the task
* </pre>
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
@Override
public void resume() {
+ // nothing to do; new transaction will be started only after topology is initialized
log.debug("Resuming");
- if (eosEnabled) {
- try {
- producer.beginTransaction();
- } catch (final ProducerFencedException fatal) {
- throw new TaskMigratedException(this, fatal);
- }
- transactionInFlight = true;
- }
}
/**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cb133c6..2937fc7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -346,9 +346,6 @@ public class StreamThread extends Thread {
return stateDirectory;
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
Collection<T> createTasks(final Consumer<byte[], byte[]> consumer, final Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
final List<T> createdTasks = new ArrayList<>();
for (final Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
@@ -401,9 +398,6 @@ public class StreamThread extends Thread {
this.threadClientId = threadClientId;
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
@Override
StreamTask createTask(final Consumer<byte[], byte[]> consumer, final TaskId taskId, final Set<TopicPartition> partitions) {
taskCreatedSensor.record();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62ddacf..72d7679 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -94,9 +94,6 @@ class TaskManager {
this.adminClient = adminClient;
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
void createTasks(final Collection<TopicPartition> assignment) {
if (consumer == null) {
throw new IllegalStateException(logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
@@ -114,9 +111,6 @@ class TaskManager {
consumer.pause(partitions);
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
private void addStreamTasks(final Collection<TopicPartition> assignment) {
if (assignedActiveTasks.isEmpty()) {
return;
@@ -156,9 +150,6 @@ class TaskManager {
}
}
- /**
- * @throws TaskMigratedException if the task producer got fenced (EOS only)
- */
private void addStandbyTasks() {
final Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = this.assignedStandbyTasks;
if (assignedStandbyTasks.isEmpty()) {
@@ -173,7 +164,6 @@ class TaskManager {
if (!standby.maybeResumeSuspendedTask(taskId, partitions)) {
newStandbyTasks.put(taskId, partitions);
}
-
}
if (newStandbyTasks.isEmpty()) {
@@ -320,7 +310,7 @@ class TaskManager {
/**
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
- * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored
+ * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
*/
boolean updateNewAndRestoringTasks() {
final Set<TopicPartition> resumed = active.initializeNewTasks();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 4bb7828..246d047 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -256,9 +256,10 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void shouldCloseTaskOnResumeIfTaskMigratedException() {
+ public void shouldCloseTaskOnResumeSuspendedIfTaskMigratedException() {
mockRunningTaskSuspension();
t1.resume();
+ t1.initializeTopology();
EasyMock.expectLastCall().andThrow(new TaskMigratedException(t1));
t1.close(false, true);
EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1165d76..a305829 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -776,6 +776,7 @@ public class StreamTaskTest {
@Test
public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
task = createStatelessTask(true);
+ task.initializeTopology();
assertTrue(producer.transactionInitialized());
assertTrue(producer.transactionInFlight());
@@ -792,6 +793,7 @@ public class StreamTaskTest {
@Test
public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
task = createStatelessTask(true);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -806,6 +808,7 @@ public class StreamTaskTest {
@Test
public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
task = createStatelessTask(true);
+ task.initializeTopology();
task.suspend();
assertTrue(producer.transactionCommitted());
@@ -828,6 +831,7 @@ public class StreamTaskTest {
@Test
public void shouldStartNewTransactionOnResumeIfEosEnabled() {
task = createStatelessTask(true);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -835,6 +839,7 @@ public class StreamTaskTest {
task.suspend();
task.resume();
+ task.initializeTopology();
assertTrue(producer.transactionInFlight());
}
@@ -854,6 +859,7 @@ public class StreamTaskTest {
@Test
public void shouldStartNewTransactionOnCommitIfEosEnabled() {
task = createStatelessTask(true);
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -878,6 +884,7 @@ public class StreamTaskTest {
@Test
public void shouldAbortTransactionOnDirtyClosedIfEosEnabled() {
task = createStatelessTask(true);
+ task.initializeTopology();
task.close(false, false);
task = null;
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.