You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ca...@apache.org on 2022/07/06 10:36:23 UTC
[kafka] branch trunk updated: KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)
This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00f395bb889 KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)
00f395bb889 is described below
commit 00f395bb889252e3efb62cedfaa1209b58d5fd3c
Author: Bruno Cadonna <ca...@apache.org>
AuthorDate: Wed Jul 6 12:36:15 2022 +0200
KAFKA-10199: Remove call to Task#completeRestoration from state updater (#12379)
The call to Task#completeRestoration calls methods on the main consumer.
The state updater thread should not access the main consumer since the
main consumer is not thread-safe. Additionally, Task#completeRestoration
changed the state of active tasks, but we decided to keep task life cycle
management outside of the state updater.
Task#completeRestoration should be called by the stream thread on
restored active tasks returned by the state udpater.
Reviewer: Guozhang Wang <gu...@apache.org>
---
.../processor/internals/DefaultStateUpdater.java | 13 ++-------
.../internals/DefaultStateUpdaterTest.java | 31 +++++-----------------
2 files changed, 9 insertions(+), 35 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 886a37b3140..3969a911f33 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -60,16 +59,12 @@ public class DefaultStateUpdater implements StateUpdater {
private final ChangelogReader changelogReader;
private final AtomicBoolean isRunning = new AtomicBoolean(true);
- private final Consumer<Set<TopicPartition>> offsetResetter;
private final Map<TaskId, Task> updatingTasks = new ConcurrentHashMap<>();
private final Logger log;
- public StateUpdaterThread(final String name,
- final ChangelogReader changelogReader,
- final Consumer<Set<TopicPartition>> offsetResetter) {
+ public StateUpdaterThread(final String name, final ChangelogReader changelogReader) {
super(name);
this.changelogReader = changelogReader;
- this.offsetResetter = offsetResetter;
final String logPrefix = String.format("%s ", name);
final LogContext logContext = new LogContext(logPrefix);
@@ -286,7 +281,6 @@ public class DefaultStateUpdater implements StateUpdater {
final Set<TopicPartition> restoredChangelogs) {
final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
- task.completeRestoration(offsetResetter);
task.maybeCheckpoint(true);
addToRestoredTasks(task);
updatingTasks.remove(task.id());
@@ -332,7 +326,6 @@ public class DefaultStateUpdater implements StateUpdater {
private final Time time;
private final ChangelogReader changelogReader;
- private final Consumer<Set<TopicPartition>> offsetResetter;
private final Queue<TaskAndAction> tasksAndActions = new LinkedList<>();
private final Lock tasksAndActionsLock = new ReentrantLock();
private final Condition tasksAndActionsCondition = tasksAndActionsLock.newCondition();
@@ -350,17 +343,15 @@ public class DefaultStateUpdater implements StateUpdater {
public DefaultStateUpdater(final StreamsConfig config,
final ChangelogReader changelogReader,
- final Consumer<Set<TopicPartition>> offsetResetter,
final Time time) {
this.changelogReader = changelogReader;
- this.offsetResetter = offsetResetter;
this.time = time;
this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
}
public void start() {
if (stateUpdaterThread == null) {
- stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader, offsetResetter);
+ stateUpdaterThread = new StateUpdaterThread("state-updater", changelogReader);
stateUpdaterThread.start();
shutdownGate = new CountDownLatch(1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 5e2d90de71d..e5962274827 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -84,7 +84,7 @@ class DefaultStateUpdaterTest {
private final StreamsConfig config = new StreamsConfig(configProps());
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
- private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time);
+ private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
@AfterEach
public void tearDown() {
@@ -93,11 +93,11 @@ class DefaultStateUpdaterTest {
private Properties configProps() {
return mkObjectProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
- mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
- mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
- mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL),
- mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), COMMIT_INTERVAL)
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL),
+ mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), COMMIT_INTERVAL)
));
}
@@ -202,7 +202,6 @@ class DefaultStateUpdaterTest {
verifyRemovedTasks();
verify(changelogReader, times(1)).enforceRestoreActive();
verify(changelogReader, atLeast(3)).restore(anyMap());
- verify(task).completeRestoration(offsetResetter);
verify(changelogReader, never()).transitToUpdateStandby();
}
@@ -234,9 +233,6 @@ class DefaultStateUpdaterTest {
verifyRemovedTasks();
verify(changelogReader, times(3)).enforceRestoreActive();
verify(changelogReader, atLeast(4)).restore(anyMap());
- verify(task3).completeRestoration(offsetResetter);
- verify(task1).completeRestoration(offsetResetter);
- verify(task2).completeRestoration(offsetResetter);
verify(changelogReader, never()).transitToUpdateStandby();
}
@@ -318,8 +314,6 @@ class DefaultStateUpdaterTest {
verifyUpdatingStandbyTasks(task4, task3);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
- verify(task1).completeRestoration(offsetResetter);
- verify(task2).completeRestoration(offsetResetter);
verify(changelogReader, atLeast(3)).restore(anyMap());
final InOrder orderVerifier = inOrder(changelogReader, task1, task2);
orderVerifier.verify(changelogReader, times(2)).enforceRestoreActive();
@@ -343,7 +337,6 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks(task1);
verifyCheckpointTasks(true, task1);
- verify(task1).completeRestoration(offsetResetter);
verifyUpdatingStandbyTasks(task2);
final InOrder orderVerifier = inOrder(changelogReader);
orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
@@ -352,7 +345,6 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task3);
verifyRestoredActiveTasks(task1, task3);
- verify(task3).completeRestoration(offsetResetter);
orderVerifier.verify(changelogReader, times(1)).enforceRestoreActive();
orderVerifier.verify(changelogReader, times(1)).transitToUpdateStandby();
}
@@ -715,7 +707,7 @@ class DefaultStateUpdaterTest {
public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
// we need to use a non auto-ticking timer here to control how much time elapsed exactly
final Time time = new MockTime();
- final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, time);
+ final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, time);
try {
final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
@@ -922,7 +914,6 @@ class DefaultStateUpdaterTest {
VERIFICATION_TIMEOUT,
"Did not get all restored active task within the given timeout!"
);
- assertTrue(restoredTasks.stream().allMatch(task -> task.state() == State.RESTORING));
}
}
@@ -956,10 +947,6 @@ class DefaultStateUpdaterTest {
VERIFICATION_TIMEOUT,
"Did not get all updating task within the given timeout!"
);
- assertTrue(updatingTasks.stream()
- .allMatch(task -> task.isActive() && task.state() == State.RESTORING
- ||
- !task.isActive() && task.state() == State.RUNNING));
}
}
@@ -975,7 +962,6 @@ class DefaultStateUpdaterTest {
VERIFICATION_TIMEOUT,
"Did not see all standby task within the given timeout!"
);
- assertTrue(standbyTasks.stream().allMatch(task -> task.state() == State.RUNNING));
}
private void verifyRemovedTasks(final Task... tasks) throws Exception {
@@ -993,9 +979,6 @@ class DefaultStateUpdaterTest {
VERIFICATION_TIMEOUT,
"Did not get all removed task within the given timeout!"
);
- assertTrue(removedTasks.stream()
- .allMatch(task -> task.isActive() && task.state() == State.RESTORING
- || !task.isActive() && task.state() == State.RUNNING));
}
}