You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2022/06/23 17:46:23 UTC
[kafka] branch trunk updated: KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 925c628173 KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)
925c628173 is described below
commit 925c6281733662cd40fffaab54a6483b00f80ee6
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Thu Jun 23 10:46:14 2022 -0700
KAFKA-10199: Commit the restoration progress within StateUpdater (#12279)
During restoring, we should always commit a.k.a. write checkpoint file regardless of EOS or ALOS, since if there's a failure we would just over-restore them upon recovery so no EOS violations happened.
Also when we complete restore or remove task, we should enforce a checkpoint as well; for failing cases though, we should not write a new one.
Reviewers: Bruno Cadonna <ca...@apache.org>
---
.../streams/processor/internals/AbstractTask.java | 4 +-
.../processor/internals/DefaultStateUpdater.java | 36 +++++++-
.../streams/processor/internals/StandbyTask.java | 2 +-
.../streams/processor/internals/StreamTask.java | 8 +-
.../kafka/streams/processor/internals/Task.java | 6 ++
.../internals/DefaultStateUpdaterTest.java | 101 +++++++++++++++++++--
.../processor/internals/StandbyTaskTest.java | 41 +++++++++
.../processor/internals/StreamTaskTest.java | 16 ++++
8 files changed, 197 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 4e652a6dfc..c64fadfe5c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -88,7 +88,8 @@ public abstract class AbstractTask implements Task {
* @throws StreamsException fatal error when flushing the state store, for example sending changelog records failed
* or flushing state store get IO errors; such error should cause the thread to die
*/
- protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+ @Override
+ public void maybeCheckpoint(final boolean enforceCheckpoint) {
final Map<TopicPartition, Long> offsetSnapshot = stateMgr.changelogOffsets();
if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, offsetSnapshotSinceLastFlush, offsetSnapshot)) {
// the state's current offset would be used to checkpoint
@@ -98,7 +99,6 @@ public abstract class AbstractTask implements Task {
}
}
-
@Override
public TaskId id() {
return id;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 54cb7bc427..cc580a3b38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
@@ -85,7 +86,7 @@ public class DefaultStateUpdater implements StateUpdater {
}
public boolean onlyStandbyTasksLeft() {
- return !updatingTasks.isEmpty() && updatingTasks.values().stream().allMatch(t -> !t.isActive());
+ return !updatingTasks.isEmpty() && updatingTasks.values().stream().noneMatch(Task::isActive);
}
@Override
@@ -111,6 +112,7 @@ public class DefaultStateUpdater implements StateUpdater {
private void runOnce() throws InterruptedException {
performActionsOnTasks();
restoreTasks();
+ maybeCheckpointUpdatingTasks(time.milliseconds());
waitIfAllChangelogsCompletelyRead();
}
@@ -252,6 +254,8 @@ public class DefaultStateUpdater implements StateUpdater {
private void removeTask(final TaskId taskId) {
final Task task = updatingTasks.remove(taskId);
if (task != null) {
+ task.maybeCheckpoint(true);
+
final Collection<TopicPartition> changelogPartitions = task.changelogPartitions();
changelogReader.unregister(changelogPartitions);
removedTasks.add(task);
@@ -271,9 +275,10 @@ public class DefaultStateUpdater implements StateUpdater {
final Collection<TopicPartition> taskChangelogPartitions = task.changelogPartitions();
if (restoredChangelogs.containsAll(taskChangelogPartitions)) {
task.completeRestoration(offsetResetter);
- log.debug("Stateful active task " + task.id() + " completed restoration");
+ task.maybeCheckpoint(true);
addTaskToRestoredTasks(task);
updatingTasks.remove(task.id());
+ log.debug("Stateful active task " + task.id() + " completed restoration");
if (onlyStandbyTasksLeft()) {
changelogReader.transitToUpdateStandby();
}
@@ -290,6 +295,23 @@ public class DefaultStateUpdater implements StateUpdater {
restoredActiveTasksLock.unlock();
}
}
+
+ private void maybeCheckpointUpdatingTasks(final long now) {
+ final long elapsedMsSinceLastCommit = now - lastCommitMs;
+ if (elapsedMsSinceLastCommit > commitIntervalMs) {
+ if (log.isDebugEnabled()) {
+ log.debug("Committing all restoring tasks since {}ms has elapsed (commit interval is {}ms)",
+ elapsedMsSinceLastCommit, commitIntervalMs);
+ }
+
+ for (final Task task : updatingTasks.values()) {
+ // do not enforce checkpointing during restoration if its position has not advanced much
+ task.maybeCheckpoint(false);
+ }
+
+ lastCommitMs = now;
+ }
+ }
}
private final Time time;
@@ -305,14 +327,22 @@ public class DefaultStateUpdater implements StateUpdater {
private final BlockingQueue<Task> removedTasks = new LinkedBlockingQueue<>();
private CountDownLatch shutdownGate;
+ private final long commitIntervalMs;
+ private long lastCommitMs;
+
private StateUpdaterThread stateUpdaterThread = null;
- public DefaultStateUpdater(final ChangelogReader changelogReader,
+ public DefaultStateUpdater(final StreamsConfig config,
+ final ChangelogReader changelogReader,
final Consumer<Set<TopicPartition>> offsetResetter,
final Time time) {
this.changelogReader = changelogReader;
this.offsetResetter = offsetResetter;
this.time = time;
+
+ this.commitIntervalMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
+ // initialize the last commit as of now to prevent first commit happens immediately
+ this.lastCommitMs = time.milliseconds();
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index ea946b2341..670c0c4beb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -194,7 +194,7 @@ public class StandbyTask extends AbstractTask implements Task {
case RUNNING:
case SUSPENDED:
- maybeWriteCheckpoint(enforceCheckpoint);
+ maybeCheckpoint(enforceCheckpoint);
log.debug("Finalized commit for {} task", state());
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index ea593a2973..8514c6ae2e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -483,14 +483,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
case RESTORING:
case SUSPENDED:
- maybeWriteCheckpoint(enforceCheckpoint);
+ maybeCheckpoint(enforceCheckpoint);
log.debug("Finalized commit for {} task with enforce checkpoint {}", state(), enforceCheckpoint);
break;
case RUNNING:
if (enforceCheckpoint || !eosEnabled) {
- maybeWriteCheckpoint(enforceCheckpoint);
+ maybeCheckpoint(enforceCheckpoint);
}
log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", state(), eosEnabled, enforceCheckpoint);
@@ -582,14 +582,14 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
* or flushing state store get IO errors; such error should cause the thread to die
*/
@Override
- protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+ public void maybeCheckpoint(final boolean enforceCheckpoint) {
// commitNeeded indicates we may have processed some records since last commit
// and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not
if (commitNeeded || enforceCheckpoint) {
stateMgr.updateChangelogOffsets(checkpointableOffsets());
}
- super.maybeWriteCheckpoint(enforceCheckpoint);
+ super.maybeCheckpoint(enforceCheckpoint);
}
private void validateClean() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index fc3e6cb1a8..a17b19997b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -141,6 +141,12 @@ public interface Task {
*/
void updateInputPartitions(final Set<TopicPartition> topicPartitions, final Map<String, List<String>> allTopologyNodesToSourceTopics);
+ /**
+ * @param enforceCheckpoint if true the task would always execute the checkpoint;
+ * otherwise it may skip if the state has not advanced much
+ */
+ void maybeCheckpoint(final boolean enforceCheckpoint);
+
void markChangelogAsCorrupted(final Collection<TopicPartition> partitions);
/**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index fa50380f7f..8f0fc935a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -16,8 +16,10 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
@@ -35,12 +37,17 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.common.utils.Utils.sleep;
+import static org.apache.kafka.streams.StreamsConfig.producerPrefix;
import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.easymock.EasyMock.anyBoolean;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -58,6 +65,7 @@ import static org.mockito.Mockito.when;
class DefaultStateUpdaterTest {
+ private final static int COMMIT_INTERVAL = 100;
private final static long CALL_TIMEOUT = 1000;
private final static long VERIFICATION_TIMEOUT = 15000;
private final static TopicPartition TOPIC_PARTITION_A_0 = new TopicPartition("topicA", 0);
@@ -69,15 +77,27 @@ class DefaultStateUpdaterTest {
private final static TaskId TASK_1_0 = new TaskId(1, 0);
private final static TaskId TASK_1_1 = new TaskId(1, 1);
+ private final StreamsConfig config = new StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader = mock(ChangelogReader.class);
private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter = topicPartitions -> { };
- private final DefaultStateUpdater stateUpdater = new DefaultStateUpdater(changelogReader, offsetResetter, Time.SYSTEM);
+
+ private DefaultStateUpdater stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
@AfterEach
public void tearDown() {
stateUpdater.shutdown(Duration.ofMinutes(1));
}
+ private Properties configProps(final int commitInterval) {
+ return mkObjectProperties(mkMap(
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, "appId"),
+ mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"),
+ mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2),
+ mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, commitInterval),
+ mkEntry(producerPrefix(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG), commitInterval)
+ ));
+ }
+
@Test
public void shouldShutdownStateUpdater() {
final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
@@ -152,6 +172,7 @@ class DefaultStateUpdaterTest {
}
verifyRestoredActiveTasks(tasks);
+ verifyNeverCheckpointTasks(tasks);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
@@ -173,6 +194,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task);
verifyRestoredActiveTasks(task);
+ verifyCheckpointTasks(true, task);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
@@ -203,6 +225,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task3);
verifyRestoredActiveTasks(task3, task1, task2);
+ verifyCheckpointTasks(true, task3, task1, task2);
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
@@ -286,6 +309,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task4);
verifyRestoredActiveTasks(task2, task1);
+ verifyCheckpointTasks(true, task2, task1);
verifyUpdatingStandbyTasks(task4, task3);
verifyExceptionsAndFailedTasks();
verifyRemovedTasks();
@@ -313,6 +337,7 @@ class DefaultStateUpdaterTest {
stateUpdater.add(task2);
verifyRestoredActiveTasks(task1);
+ verifyCheckpointTasks(true, task1);
verify(task1).completeRestoration(offsetResetter);
verifyUpdatingStandbyTasks(task2);
final InOrder orderVerifier = inOrder(changelogReader);
@@ -346,31 +371,32 @@ class DefaultStateUpdaterTest {
.thenReturn(false);
stateUpdater.add(task);
- stateUpdater.remove(TASK_0_0);
+ stateUpdater.remove(task.id());
verifyRemovedTasks(task);
+ verifyCheckpointTasks(true, task);
verifyRestoredActiveTasks();
verifyUpdatingTasks();
verifyExceptionsAndFailedTasks();
- verify(changelogReader).unregister(Collections.singletonList(TOPIC_PARTITION_A_0));
+ verify(changelogReader).unregister(task.changelogPartitions());
}
@Test
public void shouldNotRemoveActiveStatefulTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
- shouldNotRemoveTaskFromRestoredActiveTasks(task);
+ shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.singleton(TOPIC_PARTITION_A_0));
}
@Test
public void shouldNotRemoveStatelessTaskFromRestoredActiveTasks() throws Exception {
final StreamTask task = createStatelessTaskInStateRestoring(TASK_0_0);
- shouldNotRemoveTaskFromRestoredActiveTasks(task);
+ shouldNotRemoveTaskFromRestoredActiveTasks(task, Collections.emptySet());
}
- private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task) throws Exception {
+ private void shouldNotRemoveTaskFromRestoredActiveTasks(final StreamTask task, final Set<TopicPartition> completedChangelogs) throws Exception {
final StreamTask controlTask = createActiveStatefulTaskInStateRestoring(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_B_0));
when(changelogReader.completedChangelogs())
- .thenReturn(Collections.singleton(TOPIC_PARTITION_A_0));
+ .thenReturn(completedChangelogs);
when(changelogReader.allChangelogsCompleted())
.thenReturn(false);
stateUpdater.add(task);
@@ -603,6 +629,67 @@ class DefaultStateUpdaterTest {
verifyDrainingExceptionsAndFailedTasks(expectedExceptionAndTasks2, expectedExceptionAndTasks3, expectedExceptionAndTasks4);
}
+ @Test
+ public void shouldAutoCheckpointTasksOnInterval() throws Exception {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+ final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+
+ stateUpdater.add(task1);
+ stateUpdater.add(task2);
+ stateUpdater.add(task3);
+ stateUpdater.add(task4);
+
+ sleep(COMMIT_INTERVAL);
+
+ verifyExceptionsAndFailedTasks();
+ verifyCheckpointTasks(false, task1, task2, task3, task4);
+ }
+
+ @Test
+ public void shouldNotAutoCheckpointTasksIfIntervalNotElapsed() {
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+ final StreamsConfig config = new StreamsConfig(configProps(Integer.MAX_VALUE));
+ stateUpdater = new DefaultStateUpdater(config, changelogReader, offsetResetter, Time.SYSTEM);
+
+ try {
+ final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0));
+ final StreamTask task2 = createActiveStatefulTaskInStateRestoring(TASK_0_2, Collections.singletonList(TOPIC_PARTITION_B_0));
+ final StandbyTask task3 = createStandbyTaskInStateRunning(TASK_1_0, Collections.singletonList(TOPIC_PARTITION_C_0));
+ final StandbyTask task4 = createStandbyTaskInStateRunning(TASK_1_1, Collections.singletonList(TOPIC_PARTITION_D_0));
+ when(changelogReader.completedChangelogs())
+ .thenReturn(Collections.emptySet());
+ when(changelogReader.allChangelogsCompleted())
+ .thenReturn(false);
+
+ stateUpdater.add(task1);
+ stateUpdater.add(task2);
+ stateUpdater.add(task3);
+ stateUpdater.add(task4);
+
+ verifyNeverCheckpointTasks(task1, task2, task3, task4);
+ } finally {
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+ }
+ }
+
+ private void verifyCheckpointTasks(final boolean enforceCheckpoint, final Task... tasks) {
+ for (final Task task : tasks) {
+ verify(task, timeout(VERIFICATION_TIMEOUT).atLeast(1)).maybeCheckpoint(enforceCheckpoint);
+ }
+ }
+
+ private void verifyNeverCheckpointTasks(final Task... tasks) {
+ for (final Task task : tasks) {
+ verify(task, never()).maybeCheckpoint(anyBoolean());
+ }
+ }
+
private void verifyRestoredActiveTasks(final StreamTask... tasks) throws Exception {
if (tasks.length == 0) {
assertTrue(stateUpdater.getRestoredActiveTasks().isEmpty());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b6bb3b5e7a..43812020bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -206,6 +206,47 @@ public class StandbyTaskTest {
assertThrows(IllegalStateException.class, task::prepareCommit);
}
+
+ @Test
+ public void shouldAlwaysCheckpointStateIfEnforced() {
+ stateManager.flush();
+ EasyMock.expectLastCall().once();
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
+ EasyMock.replay(stateManager);
+
+ task = createStandbyTask();
+
+ task.initializeIfNeeded();
+ task.maybeCheckpoint(true);
+
+ EasyMock.verify(stateManager);
+ }
+
+ @Test
+ public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
+ EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
+ stateManager.flush();
+ EasyMock.expectLastCall();
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().once();
+ EasyMock.expect(stateManager.changelogOffsets())
+ .andReturn(Collections.singletonMap(partition, 50L))
+ .andReturn(Collections.singletonMap(partition, 11000L))
+ .andReturn(Collections.singletonMap(partition, 11000L));
+ EasyMock.replay(stateManager);
+
+ task = createStandbyTask();
+ task.initializeIfNeeded();
+
+ task.maybeCheckpoint(false); // this should not checkpoint
+ task.maybeCheckpoint(false); // this should checkpoint
+ task.maybeCheckpoint(false); // this should not checkpoint
+
+ EasyMock.verify(stateManager);
+ }
+
@Test
public void shouldFlushAndCheckpointStateManagerOnCommit() {
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 9974fa92b6..2fb87a5ae9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1945,6 +1945,22 @@ public class StreamTaskTest {
EasyMock.verify(stateManager);
}
+ @Test
+ public void shouldCheckpointState() {
+ stateManager.flush();
+ EasyMock.expectLastCall().once();
+ stateManager.checkpoint();
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(stateManager);
+
+ task = createOptimizedStatefulTask(createConfig("100"), consumer);
+
+ task.initializeIfNeeded();
+ task.maybeCheckpoint(true);
+
+ EasyMock.verify(stateManager);
+ }
+
@Test
public void shouldCheckpointOffsetsOnPostCommit() {
final long offset = 543L;