You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/06/07 00:34:48 UTC
[kafka] branch trunk updated: KAFKA-10097: Internalize checkpoint
data (#8820)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d4ef46c KAFKA-10097: Internalize checkpoint data (#8820)
d4ef46c is described below
commit d4ef46c69dbc525f55a6d9ae5c9ae6da795f98fb
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Sat Jun 6 17:34:03 2020 -0700
KAFKA-10097: Internalize checkpoint data (#8820)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <gu...@confluent.io>
---
.../streams/processor/internals/StandbyTask.java | 8 +---
.../streams/processor/internals/StreamTask.java | 54 ++++++++++++----------
.../kafka/streams/processor/internals/Task.java | 4 +-
.../streams/processor/internals/TaskManager.java | 33 ++++++-------
.../processor/internals/StandbyTaskTest.java | 9 ++--
.../processor/internals/StreamTaskTest.java | 26 +++++------
.../processor/internals/TaskManagerTest.java | 28 ++++++-----
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
8 files changed, 80 insertions(+), 86 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 8cba911..41598d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
/**
@@ -153,12 +152,10 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
prepareClose(true);
log.info("Prepared clean close");
-
- return Collections.emptyMap();
}
@Override
@@ -199,8 +196,7 @@ public class StandbyTask extends AbstractTask implements Task {
}
@Override
- public void closeClean(final Map<TopicPartition, Long> checkpoint) {
- Objects.requireNonNull(checkpoint);
+ public void closeClean() {
close(true);
log.info("Closed clean");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index d045144..b23a1a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -107,6 +107,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private boolean commitNeeded = false;
private boolean commitRequested = false;
+ private Map<TopicPartition, Long> checkpoint = null;
+
public StreamTask(final TaskId id,
final Set<TopicPartition> partitions,
final ProcessorTopology topology,
@@ -465,17 +467,15 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
}
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
- final Map<TopicPartition, Long> checkpoint = prepareClose(true);
+ public void prepareCloseClean() {
+ prepareClose(true);
log.info("Prepared clean close");
-
- return checkpoint;
}
@Override
- public void closeClean(final Map<TopicPartition, Long> checkpoint) {
- close(true, checkpoint);
+ public void closeClean() {
+ close(true);
log.info("Closed clean");
}
@@ -489,7 +489,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
@Override
public void closeDirty() {
- close(false, null);
+
+ close(false);
log.info("Closed dirty");
}
@@ -505,11 +506,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
@Override
public void closeAndRecycleState() {
- final Map<TopicPartition, Long> checkpoint = prepareClose(true);
+ prepareClose(true);
+
+ writeCheckpointIfNeed();
- if (checkpoint != null) {
- stateMgr.checkpoint(checkpoint);
- }
switch (state()) {
case CREATED:
case RUNNING:
@@ -546,14 +546,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
* otherwise, just close open resources
* @throws TaskMigratedException if the task producer got fenced (EOS)
*/
- private Map<TopicPartition, Long> prepareClose(final boolean clean) {
- final Map<TopicPartition, Long> checkpoint;
+ private void prepareClose(final boolean clean) {
+ // Reset any previously scheduled checkpoint.
+ checkpoint = null;
switch (state()) {
case CREATED:
// the task is created and not initialized, just re-write the checkpoint file
- checkpoint = Collections.emptyMap();
-
+ scheduleCheckpoint(emptyMap());
break;
case RUNNING:
@@ -562,9 +562,8 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
if (clean) {
stateMgr.flush();
recordCollector.flush();
- checkpoint = checkpointableOffsets();
+ scheduleCheckpoint(checkpointableOffsets());
} else {
- checkpoint = null; // `null` indicates to not write a checkpoint
executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log);
}
@@ -572,22 +571,29 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
case RESTORING:
executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log);
- checkpoint = Collections.emptyMap();
+ scheduleCheckpoint(emptyMap());
break;
case SUSPENDED:
case CLOSED:
// not need to checkpoint, since when suspending we've already committed the state
- checkpoint = null; // `null` indicates to not write a checkpoint
-
break;
default:
throw new IllegalStateException("Unknown state " + state() + " while prepare closing active task " + id);
}
+ }
+
+ private void scheduleCheckpoint(final Map<TopicPartition, Long> checkpoint) {
+ this.checkpoint = checkpoint;
+ }
- return checkpoint;
+ private void writeCheckpointIfNeed() {
+ if (checkpoint != null) {
+ stateMgr.checkpoint(checkpoint);
+ checkpoint = null;
+ }
}
/**
@@ -598,9 +604,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
* 3. finally release the state manager lock
* </pre>
*/
- private void close(final boolean clean, final Map<TopicPartition, Long> checkpoint) {
- if (clean && checkpoint != null) {
- executeAndMaybeSwallow(clean, () -> stateMgr.checkpoint(checkpoint), "state manager checkpoint", log);
+ private void close(final boolean clean) {
+ if (clean) {
+ executeAndMaybeSwallow(true, this::writeCheckpointIfNeed, "state manager checkpoint", log);
}
switch (state()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index eee290a..9283e86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -145,12 +145,12 @@ public interface Task {
*
* @throws StreamsException fatal error, should close the thread
*/
- Map<TopicPartition, Long> prepareCloseClean();
+ void prepareCloseClean();
/**
* Must be idempotent.
*/
- void closeClean(final Map<TopicPartition, Long> checkpoint);
+ void closeClean();
/**
* Prepare to close a task that we may not own. Discard any uncommitted progress and close the task.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 5c55093..12361d7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -190,7 +190,7 @@ public class TaskManager {
// first rectify all existing tasks
final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = new LinkedHashMap<>();
- final Map<Task, Map<TopicPartition, Long>> checkpointPerTask = new HashMap<>();
+ final Set<Task> tasksToClose = new HashSet<>();
final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
final Set<Task> additionalTasksForCommitting = new HashSet<>();
final Set<Task> dirtyTasks = new HashSet<>();
@@ -210,11 +210,11 @@ public class TaskManager {
tasksToRecycle.add(task);
} else {
try {
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
+ task.prepareCloseClean();
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task
.committableOffsetsAndMetadata();
- checkpointPerTask.put(task, checkpoint);
+ tasksToClose.add(task);
if (!committableOffsets.isEmpty()) {
consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
}
@@ -250,20 +250,17 @@ public class TaskManager {
log.error("Failed to batch commit tasks, " +
"will close all tasks involved in this commit as dirty by the end", e);
dirtyTasks.addAll(additionalTasksForCommitting);
- dirtyTasks.addAll(checkpointPerTask.keySet());
+ dirtyTasks.addAll(tasksToClose);
- checkpointPerTask.clear();
+ tasksToClose.clear();
// Just add first taskId to re-throw by the end.
taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e);
}
}
- for (final Map.Entry<Task, Map<TopicPartition, Long>> taskAndCheckpoint : checkpointPerTask.entrySet()) {
- final Task task = taskAndCheckpoint.getKey();
- final Map<TopicPartition, Long> checkpoint = taskAndCheckpoint.getValue();
-
+ for (final Task task : tasksToClose) {
try {
- completeTaskCloseClean(task, checkpoint);
+ completeTaskCloseClean(task);
cleanUpTaskProducer(task, taskCloseExceptions);
tasks.remove(task.id());
} catch (final RuntimeException e) {
@@ -631,9 +628,9 @@ public class TaskManager {
task.closeDirty();
}
- private void completeTaskCloseClean(final Task task, final Map<TopicPartition, Long> checkpoint) {
+ private void completeTaskCloseClean(final Task task) {
cleanupTask(task);
- task.closeClean(checkpoint);
+ task.closeClean();
}
// Note: this MUST be called *before* actually closing the task
@@ -652,16 +649,16 @@ public class TaskManager {
void shutdown(final boolean clean) {
final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null);
- final Map<Task, Map<TopicPartition, Long>> checkpointPerTask = new HashMap<>();
+ final Set<Task> tasksToClose = new HashSet<>();
final Map<TaskId, Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsAndMetadataPerTask = new HashMap<>();
for (final Task task : tasks.values()) {
if (clean) {
try {
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
+ task.prepareCloseClean();
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.committableOffsetsAndMetadata();
- checkpointPerTask.put(task, checkpoint);
+ tasksToClose.add(task);
if (!committableOffsets.isEmpty()) {
consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
}
@@ -681,11 +678,9 @@ public class TaskManager {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);
}
- for (final Map.Entry<Task, Map<TopicPartition, Long>> taskAndCheckpoint : checkpointPerTask.entrySet()) {
- final Task task = taskAndCheckpoint.getKey();
- final Map<TopicPartition, Long> checkpoint = taskAndCheckpoint.getValue();
+ for (final Task task : tasksToClose) {
try {
- completeTaskCloseClean(task, checkpoint);
+ completeTaskCloseClean(task);
} catch (final RuntimeException e) {
firstException.compareAndSet(null, e);
closeTaskDirty(task);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f868de4..04ef951 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -52,7 +52,6 @@ import org.junit.runner.RunWith;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
-import java.util.Map;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -270,8 +269,8 @@ public class StandbyTaskTest {
task = createStandbyTask();
task.initializeIfNeeded();
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- task.closeClean(checkpoint);
+ task.prepareCloseClean();
+ task.closeClean();
assertEquals(Task.State.CLOSED, task.state());
@@ -323,8 +322,8 @@ public class StandbyTaskTest {
task = createStandbyTask();
task.initializeIfNeeded();
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- assertThrows(RuntimeException.class, () -> task.closeClean(checkpoint));
+ task.prepareCloseClean();
+ assertThrows(RuntimeException.class, () -> task.closeClean());
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a4431a2..27135f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1567,8 +1567,8 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- task.closeClean(checkpoint);
+ task.prepareCloseClean();
+ task.closeClean();
assertEquals(Task.State.CLOSED, task.state());
assertFalse(source1.initialized);
@@ -1642,8 +1642,8 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- task.closeClean(checkpoint);
+ task.prepareCloseClean();
+ task.closeClean();
assertEquals(Task.State.CLOSED, task.state());
@@ -1668,8 +1668,8 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
task.completeRestoration();
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- task.closeClean(checkpoint);
+ task.prepareCloseClean();
+ task.closeClean();
assertEquals(Task.State.CLOSED, task.state());
@@ -1696,8 +1696,8 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration();
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- assertThrows(ProcessorStateException.class, () -> task.closeClean(checkpoint));
+ task.prepareCloseClean();
+ assertThrows(ProcessorStateException.class, () -> task.closeClean());
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName);
@@ -1760,8 +1760,8 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
task.initializeIfNeeded();
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- assertThrows(ProcessorStateException.class, () -> task.closeClean(checkpoint));
+ task.prepareCloseClean();
+ assertThrows(ProcessorStateException.class, () -> task.closeClean());
assertEquals(Task.State.RESTORING, task.state());
@@ -1795,11 +1795,11 @@ public class StreamTaskTest {
task = createOptimizedStatefulTask(createConfig(false, "100"), consumer);
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- task.closeClean(checkpoint);
+ task.prepareCloseClean();
+ task.closeClean();
// close calls are idempotent since we are already in closed
- task.closeClean(checkpoint);
+ task.closeClean();
task.closeDirty();
EasyMock.reset(stateManager);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 7f31d7e..200d841 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -604,7 +604,7 @@ public class TaskManagerTest {
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new RuntimeException("oops");
}
};
@@ -1115,7 +1115,7 @@ public class TaskManagerTest {
final AtomicBoolean closedDirtyTask03 = new AtomicBoolean(false);
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new TaskMigratedException("migrated", new RuntimeException("cause"));
}
@@ -1133,7 +1133,7 @@ public class TaskManagerTest {
};
final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new RuntimeException("oops");
}
@@ -1451,13 +1451,13 @@ public class TaskManagerTest {
};
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new TaskMigratedException("migrated", new RuntimeException("cause"));
}
};
final Task task02 = new StateMachineTask(taskId02, taskId02Partitions, true) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new RuntimeException("oops");
}
};
@@ -2273,14 +2273,14 @@ public class TaskManagerTest {
public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new TaskMigratedException("t1 close exception", new RuntimeException());
}
};
final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new TaskMigratedException("t2 close exception", new RuntimeException());
}
};
@@ -2303,14 +2303,14 @@ public class TaskManagerTest {
public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new TaskMigratedException("t1 close exception", new RuntimeException());
}
};
final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
}
};
@@ -2332,14 +2332,14 @@ public class TaskManagerTest {
public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, taskId01Partitions, false) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new TaskMigratedException("t1 close exception", new RuntimeException());
}
};
final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, taskId02Partitions, false) {
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
+ public void prepareCloseClean() {
throw new KafkaException("Kaboom for t2!", new RuntimeException());
}
};
@@ -2703,15 +2703,13 @@ public class TaskManagerTest {
}
@Override
- public Map<TopicPartition, Long> prepareCloseClean() {
- return Collections.emptyMap();
- }
+ public void prepareCloseClean() {}
@Override
public void prepareCloseDirty() {}
@Override
- public void closeClean(final Map<TopicPartition, Long> checkpoint) {
+ public void closeClean() {
transitionTo(State.CLOSED);
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index db31012..3416ec0 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1180,8 +1180,8 @@ public class TopologyTestDriver implements Closeable {
*/
public void close() {
if (task != null) {
- final Map<TopicPartition, Long> checkpoint = task.prepareCloseClean();
- task.closeClean(checkpoint);
+ task.prepareCloseClean();
+ task.closeClean();
}
if (globalStateTask != null) {
try {