You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ro...@apache.org on 2022/08/09 10:26:03 UTC

[flink] branch master updated (0dc8890f1b8 -> 52eb7e76b5d)

This is an automated email from the ASF dual-hosted git repository.

roman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 0dc8890f1b8 [FLINK-28711][hive] Hive source supports dynamic filtering
     new 1f9632a0719 [FLINK-27693][changelog] Support local recovery for non-materialized part
     new 52eb7e76b5d [FLINK-27693][docs] Remove local recovery from the Limitations of changelog

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/content.zh/docs/ops/state/state_backends.md   |   2 -
 docs/content/docs/ops/state/state_backends.md      |   2 -
 .../fs/AbstractStateChangeFsUploader.java          | 120 ++++++++++++++
 .../fs/DuplicatingOutputStreamWithPos.java         | 174 +++++++++++++++++++++
 .../fs/DuplicatingStateChangeFsUploader.java       | 111 +++++++++++++
 .../changelog/fs/FsStateChangelogStorage.java      |  50 ++++--
 .../fs/FsStateChangelogStorageFactory.java         |   8 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java | 111 ++++++++++---
 .../flink/changelog/fs/OutputStreamWithPos.java    |  54 ++++++-
 .../flink/changelog/fs/StateChangeFormat.java      |  10 +-
 .../flink/changelog/fs/StateChangeFsUploader.java  | 104 ++----------
 .../changelog/fs/StateChangeUploadScheduler.java   |  34 ++--
 .../flink/changelog/fs/StateChangeUploader.java    |  33 +++-
 .../apache/flink/changelog/fs/UploadResult.java    |  41 ++++-
 .../fs/BatchingStateChangeUploadSchedulerTest.java |   6 +-
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  45 ++++--
 .../changelog/fs/FsStateChangelogStorageTest.java  |   7 +-
 .../fs/FsStateChangelogWriterSqnTest.java          |   6 +-
 .../changelog/fs/FsStateChangelogWriterTest.java   |  25 ++-
 .../state/ChangelogTaskLocalStateStore.java        |  33 +++-
 .../TaskExecutorStateChangelogStoragesManager.java |   6 +-
 .../state/changelog/LocalChangelogRegistry.java    |  64 ++++++++
 .../changelog/LocalChangelogRegistryImpl.java      | 118 ++++++++++++++
 .../changelog/StateChangelogStorageFactory.java    |   6 +-
 .../changelog/StateChangelogStorageLoader.java     |   8 +-
 .../state/changelog/StateChangelogWriter.java      |  21 ++-
 .../InMemoryStateChangelogStorageFactory.java      |   6 +-
 .../inmemory/InMemoryStateChangelogWriter.java     |  15 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   5 +-
 .../state/ChangelogTaskLocalStateStoreTest.java    |   2 +
 ...kExecutorStateChangelogStoragesManagerTest.java |  60 +++++--
 .../changelog/LocalChangelogRegistryTest.java      |  56 +++++++
 .../inmemory/StateChangelogStorageLoaderTest.java  |  16 +-
 .../inmemory/StateChangelogStorageTest.java        |   7 +-
 ...cutorExecutionDeploymentReconciliationTest.java |  14 +-
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |  14 ++
 .../changelog/ChangelogKeyedStateBackend.java      |  28 +++-
 .../state/changelog/ChangelogTruncateHelper.java   |   1 +
 .../changelog/ChangelogStateBackendTestUtils.java  |   4 +-
 .../state/changelog/ChangelogStateDiscardTest.java |  20 ++-
 .../state/changelog/StateChangeLoggerTestBase.java |   7 +-
 .../ChangelogLocalRecoveryITCase.java              |   6 +-
 42 files changed, 1219 insertions(+), 241 deletions(-)
 create mode 100644 flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
 create mode 100644 flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java
 create mode 100644 flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java


[flink] 02/02: [FLINK-27693][docs] Remove local recovery from the Limitations of changelog

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52eb7e76b5d66ff5c4d9d4af8a213b5b8f9f8322
Author: fredia <fr...@gmail.com>
AuthorDate: Mon Aug 8 23:45:45 2022 +0800

    [FLINK-27693][docs] Remove local recovery from the Limitations of changelog
---
 docs/content.zh/docs/ops/state/state_backends.md | 2 --
 docs/content/docs/ops/state/state_backends.md    | 2 --
 2 files changed, 4 deletions(-)

diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index 01080f7efad..29606369f34 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -406,7 +406,6 @@ dstl.dfs.base-path: s3://<bucket-name> # 类似于 state.checkpoints.dir
 请将如下配置保持默认值 (参见[限制](#limitations)):
 ```yaml
 execution.checkpointing.max-concurrent-checkpoints: 1
-state.backend.local-recovery: false
 ```
 
 有关其他配置选项,请参阅[配置]({{< ref "docs/deployment/config#state-changelog-options" >}})部分。
@@ -465,7 +464,6 @@ env.enable_changelog_statebackend(true)
 
 ### 限制
 - 最多同时创建一个 checkpoint
-- 本地恢复暂不支持
 - 到 Flink 1.15 为止, 只有 `filesystem` changelog 实现可用
 - 尚不支持 [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) 模式
 
diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md
index 16d3b8f4515..9835c9fe52a 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -407,7 +407,6 @@ dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
 Please keep the following defaults (see [limitations](#limitations)):
 ```yaml
 execution.checkpointing.max-concurrent-checkpoints: 1
-state.backend.local-recovery: false
 ```
 
 Please refer to the [configuration section]({{< ref "docs/deployment/config#state-changelog-options" >}}) for other options.
@@ -460,7 +459,6 @@ Resuming from both savepoints and checkpoints is supported:
 
 ### Limitations
  - At most one concurrent checkpoint
- - Local recovery not supported
  - As of Flink 1.15, only `filesystem` changelog implementation is available
 - [NO_CLAIM]({{< ref "docs/deployment/config#execution-savepoint-restore-mode" >}}) mode not supported
 


[flink] 01/02: [FLINK-27693][changelog] Support local recovery for non-materialized part

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f9632a07199854c0225bd7f416c038fbf59abe0
Author: fredia <fr...@gmail.com>
AuthorDate: Tue May 31 15:22:59 2022 +0800

    [FLINK-27693][changelog] Support local recovery for non-materialized part
---
 .../fs/AbstractStateChangeFsUploader.java          | 120 ++++++++++++++
 .../fs/DuplicatingOutputStreamWithPos.java         | 174 +++++++++++++++++++++
 .../fs/DuplicatingStateChangeFsUploader.java       | 111 +++++++++++++
 .../changelog/fs/FsStateChangelogStorage.java      |  50 ++++--
 .../fs/FsStateChangelogStorageFactory.java         |   8 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java | 111 ++++++++++---
 .../flink/changelog/fs/OutputStreamWithPos.java    |  54 ++++++-
 .../flink/changelog/fs/StateChangeFormat.java      |  10 +-
 .../flink/changelog/fs/StateChangeFsUploader.java  | 104 ++----------
 .../changelog/fs/StateChangeUploadScheduler.java   |  34 ++--
 .../flink/changelog/fs/StateChangeUploader.java    |  33 +++-
 .../apache/flink/changelog/fs/UploadResult.java    |  41 ++++-
 .../fs/BatchingStateChangeUploadSchedulerTest.java |   6 +-
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  45 ++++--
 .../changelog/fs/FsStateChangelogStorageTest.java  |   7 +-
 .../fs/FsStateChangelogWriterSqnTest.java          |   6 +-
 .../changelog/fs/FsStateChangelogWriterTest.java   |  25 ++-
 .../state/ChangelogTaskLocalStateStore.java        |  33 +++-
 .../TaskExecutorStateChangelogStoragesManager.java |   6 +-
 .../state/changelog/LocalChangelogRegistry.java    |  64 ++++++++
 .../changelog/LocalChangelogRegistryImpl.java      | 118 ++++++++++++++
 .../changelog/StateChangelogStorageFactory.java    |   6 +-
 .../changelog/StateChangelogStorageLoader.java     |   8 +-
 .../state/changelog/StateChangelogWriter.java      |  21 ++-
 .../InMemoryStateChangelogStorageFactory.java      |   6 +-
 .../inmemory/InMemoryStateChangelogWriter.java     |  15 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   5 +-
 .../state/ChangelogTaskLocalStateStoreTest.java    |   2 +
 ...kExecutorStateChangelogStoragesManagerTest.java |  60 +++++--
 .../changelog/LocalChangelogRegistryTest.java      |  56 +++++++
 .../inmemory/StateChangelogStorageLoaderTest.java  |  16 +-
 .../inmemory/StateChangelogStorageTest.java        |   7 +-
 ...cutorExecutionDeploymentReconciliationTest.java |  14 +-
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |  14 ++
 .../changelog/ChangelogKeyedStateBackend.java      |  28 +++-
 .../state/changelog/ChangelogTruncateHelper.java   |   1 +
 .../changelog/ChangelogStateBackendTestUtils.java  |   4 +-
 .../state/changelog/ChangelogStateDiscardTest.java |  20 ++-
 .../state/changelog/StateChangeLoggerTestBase.java |   7 +-
 .../ChangelogLocalRecoveryITCase.java              |   6 +-
 40 files changed, 1219 insertions(+), 237 deletions(-)

diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
new file mode 100644
index 00000000000..3dd3dfcc70f
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/** Base implementation of StateChangeUploader. */
+public abstract class AbstractStateChangeFsUploader implements StateChangeUploader {
+
+    private final StateChangeFormat format;
+    private final Clock clock;
+    private final TaskChangelogRegistry changelogRegistry;
+    private final BiFunction<Path, Long, StreamStateHandle> handleFactory;
+    protected final ChangelogStorageMetricGroup metrics;
+    protected final boolean compression;
+    protected final int bufferSize;
+
+    public AbstractStateChangeFsUploader(
+            boolean compression,
+            int bufferSize,
+            ChangelogStorageMetricGroup metrics,
+            TaskChangelogRegistry changelogRegistry,
+            BiFunction<Path, Long, StreamStateHandle> handleFactory) {
+        this.format = new StateChangeFormat();
+        this.compression = compression;
+        this.bufferSize = bufferSize;
+        this.metrics = metrics;
+        this.clock = SystemClock.getInstance();
+        this.changelogRegistry = changelogRegistry;
+        this.handleFactory = handleFactory;
+    }
+
+    abstract OutputStreamWithPos prepareStream() throws IOException;
+
+    @Override
+    public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+        metrics.getUploadsCounter().inc();
+        long start = clock.relativeTimeNanos();
+        UploadTasksResult result = uploadInternal(tasks);
+        metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start);
+        metrics.getUploadSizes().update(result.getStateSize());
+        return result;
+    }
+
+    private UploadTasksResult uploadInternal(Collection<UploadTask> tasks) throws IOException {
+        try (OutputStreamWithPos stream = prepareStream()) {
+            final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets =
+                    new HashMap<>();
+            for (UploadTask task : tasks) {
+                tasksOffsets.put(task, format.write(stream, task.changeSets));
+            }
+            StreamStateHandle handle = stream.getHandle(handleFactory);
+            changelogRegistry.startTracking(
+                    handle,
+                    tasks.stream()
+                            .flatMap(t -> t.getChangeSets().stream())
+                            .map(StateChangeSet::getLogId)
+                            .collect(Collectors.toSet()));
+            if (stream instanceof DuplicatingOutputStreamWithPos) {
+                StreamStateHandle localHandle =
+                        ((DuplicatingOutputStreamWithPos) stream).getSecondaryHandle(handleFactory);
+                changelogRegistry.startTracking(
+                        localHandle,
+                        tasks.stream()
+                                .flatMap(t -> t.getChangeSets().stream())
+                                .map(StateChangeSet::getLogId)
+                                .collect(Collectors.toSet()));
+                return new UploadTasksResult(tasksOffsets, handle, localHandle);
+            }
+            // WARN: streams have to be closed before returning the results
+            // otherwise JM may receive invalid handles
+            return new UploadTasksResult(tasksOffsets, handle);
+        } catch (IOException e) {
+            metrics.getUploadFailuresCounter().inc();
+            try (Closer closer = Closer.create()) {
+                closer.register(
+                        () -> {
+                            throw e;
+                        });
+                tasks.forEach(cs -> closer.register(() -> cs.fail(e)));
+            }
+        }
+        return null; // closer above throws an exception
+    }
+
+    protected String generateFileName() {
+        return UUID.randomUUID().toString();
+    }
+}
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java
new file mode 100644
index 00000000000..b977bd3de0d
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.BiFunction;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The
+ * difference is that this stream does not delete the file when {@link #close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {
+    private static final Logger LOG = LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class);
+
+    private OutputStream secondaryStream;
+    private OutputStream originalSecondaryStream;
+    private final Path secondaryPath;
+
+    /**
+     * Stores a potential exception that occurred while interacting with {@link #secondaryStream}.
+     */
+    private Exception secondaryStreamException;
+
+    public DuplicatingOutputStreamWithPos(
+            OutputStream primaryStream,
+            Path primaryPath,
+            OutputStream secondaryStream,
+            Path secondaryPath) {
+        super(primaryStream, primaryPath);
+        this.secondaryStream = Preconditions.checkNotNull(secondaryStream);
+        this.originalSecondaryStream = Preconditions.checkNotNull(secondaryStream);
+        this.secondaryPath = Preconditions.checkNotNull(secondaryPath);
+    }
+
+    @Override
+    public void wrap(boolean compression, int bufferSize) throws IOException {
+        super.wrap(compression, bufferSize);
+        this.secondaryStream = wrapInternal(compression, bufferSize, this.originalSecondaryStream);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        outputStream.write(b);
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.write(b);
+            } catch (Exception ex) {
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+        pos++;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        outputStream.write(b);
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.write(b);
+            } catch (Exception ex) {
+                LOG.warn("Exception encountered during write to secondary stream");
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+        pos += b.length;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        outputStream.write(b, off, len);
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.write(b, off, len);
+            } catch (Exception ex) {
+                LOG.warn("Exception encountered during writing to secondary stream");
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+        pos += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        outputStream.flush();
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.flush();
+            } catch (Exception ex) {
+                LOG.warn("Exception encountered during flushing secondary stream");
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        Exception exCollector = null;
+
+        try {
+            super.close();
+        } catch (Exception closeEx) {
+            exCollector = ExceptionUtils.firstOrSuppressed(closeEx, exCollector);
+        }
+
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.close();
+                originalSecondaryStream.close();
+            } catch (Exception closeEx) {
+                getSecondaryPath().getFileSystem().delete(getSecondaryPath(), true);
+                handleSecondaryStreamOnException(closeEx);
+            }
+        }
+
+        if (exCollector != null) {
+            throw new IOException("Exception while closing duplicating stream.", exCollector);
+        }
+    }
+
+    private void handleSecondaryStreamOnException(Exception ex) {
+
+        Preconditions.checkState(
+                secondaryStreamException == null,
+                "Secondary stream already failed from previous exception!");
+        try {
+            secondaryStream.close();
+        } catch (Exception closeEx) {
+            ex = ExceptionUtils.firstOrSuppressed(closeEx, ex);
+        }
+
+        secondaryStreamException = Preconditions.checkNotNull(ex);
+    }
+
+    public Path getSecondaryPath() {
+        return secondaryPath;
+    }
+
+    public StreamStateHandle getSecondaryHandle(
+            BiFunction<Path, Long, StreamStateHandle> handleFactory) throws IOException {
+        if (secondaryStreamException == null) {
+            return handleFactory.apply(secondaryPath, this.pos);
+        } else {
+            throw new IOException(
+                    "Secondary stream previously failed exceptionally", secondaryStreamException);
+        }
+    }
+}
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
new file mode 100644
index 00000000000..cf99ed17683
--- /dev/null
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.changelog.fs.StateChangeFsUploader.PATH_SUB_DIR;
+import static org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLocalTaskOwnedDirectory;
+
+/**
+ * A StateChangeFsUploader implementation that writes the changes to remote and local.
+ * <li>Local dstl files are only managed by TM side, {@link LocalChangelogRegistry}, {@link
+ *     TaskChangelogRegistry} and {@link ChangelogTaskLocalStateStore} are responsible for managing
+ *     them.
+ * <li>Remote dstl files are managed by TM side and JM side, {@link TaskChangelogRegistry} is
+ *     responsible for TM side, and {@link SharedStateRegistry} is responsible for JM side.
+ *
+ *     <p>The total discard logic of local dstl files is:
+ *
+ *     <ol>
+ *       <li>Register files to {@link TaskChangelogRegistry#startTracking} on {@link #upload}.
+ *       <li>Store the meta of files into {@link ChangelogTaskLocalStateStore} by
+ *           AsyncCheckpointRunnable#reportCompletedSnapshotStates().
+ *       <li>Pass control of the file to {@link LocalChangelogRegistry#register} when
+ *           ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of the previous
+ *           checkpoint will be deleted by {@link LocalChangelogRegistry#discardUpToCheckpoint} at
+ *           the same time.
+ *       <li>When ChangelogTruncateHelper#materialized() or
+ *           ChangelogTruncateHelper#checkpointSubsumed() is called, {@link
+ *           TaskChangelogRegistry#notUsed} is responsible for deleting local files.
+ *       <li>When one checkpoint is aborted, the dstl files of this checkpoint will be deleted by
+ *           {@link LocalChangelogRegistry#prune} in {@link FsStateChangelogWriter#reset}.
+ *     </ol>
+ */
+public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DuplicatingStateChangeFsUploader.class);
+
+    private final Path basePath;
+    private final FileSystem fileSystem;
+    private final LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider;
+    private final JobID jobID;
+
+    public DuplicatingStateChangeFsUploader(
+            JobID jobID,
+            Path basePath,
+            FileSystem fileSystem,
+            boolean compression,
+            int bufferSize,
+            ChangelogStorageMetricGroup metrics,
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) {
+        super(compression, bufferSize, metrics, changelogRegistry, FileStateHandle::new);
+        this.basePath =
+                new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
+        this.fileSystem = fileSystem;
+        this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider;
+        this.jobID = jobID;
+    }
+
+    @Override
+    public OutputStreamWithPos prepareStream() throws IOException {
+        final String fileName = generateFileName();
+        LOG.debug("upload tasks to {}", fileName);
+        Path path = new Path(basePath, fileName);
+        FSDataOutputStream primaryStream = fileSystem.create(path, WriteMode.NO_OVERWRITE);
+        Path localPath =
+                new Path(
+                        getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID),
+                        fileName);
+        FSDataOutputStream secondaryStream =
+                localPath.getFileSystem().create(localPath, WriteMode.NO_OVERWRITE);
+        DuplicatingOutputStreamWithPos outputStream =
+                new DuplicatingOutputStreamWithPos(primaryStream, path, secondaryStream, localPath);
+        outputStream.wrap(this.compression, this.bufferSize);
+        return outputStream;
+    }
+
+    @Override
+    public void close() throws Exception {}
+}
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index de7718521a0..5ba18f701ea 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -26,16 +26,22 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_DISCARD_THREADS;
@@ -62,26 +68,42 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
 
     private final TaskChangelogRegistry changelogRegistry;
 
+    @Nullable private LocalChangelogRegistry localChangelogRegistry = LocalChangelogRegistry.NO_OP;
+
+    /** The configuration for local recovery. */
+    @Nonnull private final LocalRecoveryConfig localRecoveryConfig;
+
     public FsStateChangelogStorage(
-            JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration config,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
-        this(jobID, config, metricGroup, defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)));
+        this(
+                jobID,
+                config,
+                metricGroup,
+                defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)),
+                localRecoveryConfig);
     }
 
     public FsStateChangelogStorage(
             JobID jobID,
             Configuration config,
             TaskManagerJobMetricGroup metricGroup,
-            TaskChangelogRegistry changelogRegistry)
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         this(
                 fromConfig(
                         jobID,
                         config,
                         new ChangelogStorageMetricGroup(metricGroup),
-                        changelogRegistry),
+                        changelogRegistry,
+                        localRecoveryConfig),
                 config.get(PREEMPTIVE_PERSIST_THRESHOLD).getBytes(),
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig);
     }
 
     @VisibleForTesting
@@ -91,7 +113,8 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
             boolean compression,
             int bufferSize,
             ChangelogStorageMetricGroup metricGroup,
-            TaskChangelogRegistry changelogRegistry)
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         this(
                 directScheduler(
@@ -104,18 +127,25 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
                                 metricGroup,
                                 changelogRegistry)),
                 PREEMPTIVE_PERSIST_THRESHOLD.defaultValue().getBytes(),
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig);
     }
 
     @VisibleForTesting
     public FsStateChangelogStorage(
             StateChangeUploadScheduler uploader,
             long preEmptivePersistThresholdInBytes,
-            TaskChangelogRegistry changelogRegistry) {
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig) {
         super(ChangelogStreamHandleReader.DIRECT_READER);
         this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
         this.changelogRegistry = changelogRegistry;
         this.uploader = uploader;
+        this.localRecoveryConfig = localRecoveryConfig;
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            this.localChangelogRegistry =
+                    new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor());
+        }
     }
 
     @Override
@@ -129,7 +159,9 @@ public class FsStateChangelogStorage extends FsStateChangelogStorageForRecovery
                 uploader,
                 preEmptivePersistThresholdInBytes,
                 mailboxExecutor,
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig,
+                localChangelogRegistry);
     }
 
     @Override
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
index c81931241a5..edb412e3ea4 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
@@ -47,9 +48,12 @@ public class FsStateChangelogStorageFactory implements StateChangelogStorageFact
 
     @Override
     public StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
-        return new FsStateChangelogStorage(jobID, configuration, metricGroup);
+        return new FsStateChangelogStorage(jobID, configuration, metricGroup, localRecoveryConfig);
     }
 
     @Override
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index e6b5b583885..2481fdcc2e4 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -22,8 +22,11 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.SequenceNumberRange;
 import org.apache.flink.runtime.state.changelog.StateChange;
@@ -32,6 +35,7 @@ import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
@@ -139,19 +143,28 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
 
     private final TaskChangelogRegistry changelogRegistry;
 
+    /** The configuration for local recovery. */
+    @Nonnull private final LocalRecoveryConfig localRecoveryConfig;
+
+    private final LocalChangelogRegistry localChangelogRegistry;
+
     FsStateChangelogWriter(
             UUID logId,
             KeyGroupRange keyGroupRange,
             StateChangeUploadScheduler uploader,
             long preEmptivePersistThresholdInBytes,
             MailboxExecutor mailboxExecutor,
-            TaskChangelogRegistry changelogRegistry) {
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig,
+            LocalChangelogRegistry localChangelogRegistry) {
         this.logId = logId;
         this.keyGroupRange = keyGroupRange;
         this.uploader = uploader;
         this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
         this.mailboxExecutor = mailboxExecutor;
         this.changelogRegistry = changelogRegistry;
+        this.localRecoveryConfig = localRecoveryConfig;
+        this.localChangelogRegistry = localChangelogRegistry;
     }
 
     @Override
@@ -185,8 +198,8 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
     }
 
     @Override
-    public CompletableFuture<ChangelogStateHandleStreamImpl> persist(SequenceNumber from)
-            throws IOException {
+    public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persist(
+            SequenceNumber from) throws IOException {
         LOG.debug(
                 "persist {} starting from sqn {} (incl.), active sqn: {}",
                 logId,
@@ -195,8 +208,8 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
         return persistInternal(from);
     }
 
-    private CompletableFuture<ChangelogStateHandleStreamImpl> persistInternal(SequenceNumber from)
-            throws IOException {
+    private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> persistInternal(
+            SequenceNumber from) throws IOException {
         ensureCanPersist(from);
         rollover();
         Map<SequenceNumber, StateChangeSet> toUpload = drainTailMap(notUploaded, from);
@@ -206,9 +219,11 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
         SequenceNumberRange range = SequenceNumberRange.generic(from, activeSequenceNumber);
         if (range.size() == readyToReturn.size()) {
             checkState(toUpload.isEmpty());
-            return CompletableFuture.completedFuture(buildHandle(keyGroupRange, readyToReturn, 0L));
+            return CompletableFuture.completedFuture(
+                    buildSnapshotResult(keyGroupRange, readyToReturn, 0L));
         } else {
-            CompletableFuture<ChangelogStateHandleStreamImpl> future = new CompletableFuture<>();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
+                    new CompletableFuture<>();
             uploadCompletionListeners.add(
                     new UploadCompletionListener(keyGroupRange, range, readyToReturn, future));
             if (!toUpload.isEmpty()) {
@@ -262,6 +277,9 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
                                 // uploaded already truncated, i.e. materialized state changes,
                                 // or closed
                                 changelogRegistry.notUsed(result.streamStateHandle, logId);
+                                if (result.localStreamHandle != null) {
+                                    changelogRegistry.notUsed(result.localStreamHandle, logId);
+                                }
                             }
                         }
                     }
@@ -314,7 +332,7 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
     }
 
     @Override
-    public void confirm(SequenceNumber from, SequenceNumber to) {
+    public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) {
         checkState(from.compareTo(to) <= 0, "Invalid confirm range: [%s,%s)", from, to);
         checkState(
                 from.compareTo(activeSequenceNumber) <= 0
@@ -329,14 +347,30 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
         uploaded.subMap(from, to).values().stream()
                 .map(UploadResult::getStreamStateHandle)
                 .forEach(changelogRegistry::stopTracking);
+
+        // transfer the control of localHandle to localStateRegistry.
+        uploaded.subMap(from, to).values().stream()
+                .map(UploadResult::getLocalStreamHandleStateHandle)
+                .filter(localHandle -> localHandle != null)
+                .forEach(
+                        localHandle -> {
+                            changelogRegistry.stopTracking(localHandle);
+                            localChangelogRegistry.register(localHandle, checkpointId);
+                        });
+        localChangelogRegistry.discardUpToCheckpoint(checkpointId);
+    }
+
+    @Override
+    public void subsume(long checkpointId) {
+        localChangelogRegistry.discardUpToCheckpoint(checkpointId);
     }
 
     @Override
-    public void reset(SequenceNumber from, SequenceNumber to) {
-        // do nothing
+    public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) {
+        localChangelogRegistry.prune(checkpointId);
     }
 
-    private static ChangelogStateHandleStreamImpl buildHandle(
+    private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
             KeyGroupRange keyGroupRange,
             NavigableMap<SequenceNumber, UploadResult> results,
             long incrementalSize) {
@@ -346,12 +380,42 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
             tuples.add(Tuple2.of(uploadResult.getStreamStateHandle(), uploadResult.getOffset()));
             size += uploadResult.getSize();
         }
-        return new ChangelogStateHandleStreamImpl(
-                tuples,
-                keyGroupRange,
-                size,
-                incrementalSize,
-                FsStateChangelogStorageFactory.IDENTIFIER);
+        ChangelogStateHandleStreamImpl jmChangelogStateHandle =
+                new ChangelogStateHandleStreamImpl(
+                        tuples,
+                        keyGroupRange,
+                        size,
+                        incrementalSize,
+                        FsStateChangelogStorageFactory.IDENTIFIER);
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            size = 0;
+            List<Tuple2<StreamStateHandle, Long>> localTuples = new ArrayList<>();
+            for (UploadResult uploadResult : results.values()) {
+                if (uploadResult.getLocalStreamHandleStateHandle() != null) {
+                    localTuples.add(
+                            Tuple2.of(
+                                    uploadResult.getLocalStreamHandleStateHandle(),
+                                    uploadResult.getLocalOffset()));
+                    size += uploadResult.getSize();
+                }
+            }
+            ChangelogStateHandleStreamImpl localChangelogStateHandle = null;
+            if (localTuples.size() == tuples.size()) {
+                localChangelogStateHandle =
+                        new ChangelogStateHandleStreamImpl(
+                                localTuples,
+                                keyGroupRange,
+                                size,
+                                0L,
+                                FsStateChangelogStorageFactory.IDENTIFIER);
+                return SnapshotResult.withLocalState(
+                        jmChangelogStateHandle, localChangelogStateHandle);
+
+            } else {
+                LOG.warn("local handles are different from remote");
+            }
+        }
+        return SnapshotResult.of(jmChangelogStateHandle);
     }
 
     @VisibleForTesting
@@ -378,9 +442,10 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
         }
     }
 
-    private static final class UploadCompletionListener {
+    private final class UploadCompletionListener {
         private final NavigableMap<SequenceNumber, UploadResult> uploaded;
-        private final CompletableFuture<ChangelogStateHandleStreamImpl> completionFuture;
+        private final CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
+                completionFuture;
         private final KeyGroupRange keyGroupRange;
         private final SequenceNumberRange changeRange;
 
@@ -388,7 +453,8 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
                 KeyGroupRange keyGroupRange,
                 SequenceNumberRange changeRange,
                 Map<SequenceNumber, UploadResult> uploaded,
-                CompletableFuture<ChangelogStateHandleStreamImpl> completionFuture) {
+                CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
+                        completionFuture) {
             checkArgument(
                     !changeRange.isEmpty(), "Empty change range not allowed: %s", changeRange);
             this.uploaded = new TreeMap<>(uploaded);
@@ -405,7 +471,7 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
                     incrementalSize += uploadResult.getSize();
                     if (uploaded.size() == changeRange.size()) {
                         completionFuture.complete(
-                                buildHandle(keyGroupRange, uploaded, incrementalSize));
+                                buildSnapshotResult(keyGroupRange, uploaded, incrementalSize));
                         return true;
                     }
                 }
@@ -440,6 +506,9 @@ class FsStateChangelogWriter implements StateChangelogWriter<ChangelogStateHandl
         LOG.trace("Uploaded state to discard: {}", notUsedState);
         for (UploadResult result : notUsedState.values()) {
             changelogRegistry.notUsed(result.streamStateHandle, logId);
+            if (result.localStreamHandle != null) {
+                changelogRegistry.notUsed(result.localStreamHandle, logId);
+            }
         }
     }
 }
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
index b6be3a766a3..89d71f6dde2 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
@@ -17,15 +17,46 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.function.BiFunction;
 
 class OutputStreamWithPos extends OutputStream {
-    private final OutputStream outputStream;
-    private long pos;
+    protected final Path path;
+    protected OutputStream outputStream;
+    protected long pos;
+    protected boolean compression;
+    protected final OutputStream originalStream;
+
+    public OutputStreamWithPos(OutputStream outputStream, Path path) {
+        this.outputStream = Preconditions.checkNotNull(outputStream);
+        this.originalStream = Preconditions.checkNotNull(outputStream);
+        this.path = Preconditions.checkNotNull(path);
+        this.pos = 0;
+        this.compression = false;
+    }
+
+    protected OutputStream wrapInternal(boolean compression, int bufferSize, OutputStream fsStream)
+            throws IOException {
+        fsStream.write(compression ? 1 : 0);
+        StreamCompressionDecorator instance =
+                compression
+                        ? SnappyStreamCompressionDecorator.INSTANCE
+                        : UncompressedStreamCompressionDecorator.INSTANCE;
+        return new BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize);
+    }
 
-    public OutputStreamWithPos(OutputStream outputStream) {
-        this.outputStream = outputStream;
+    public void wrap(boolean compression, int bufferSize) throws IOException {
+        this.compression = compression;
+        this.outputStream = wrapInternal(compression, bufferSize, this.originalStream);
     }
 
     @Override
@@ -53,10 +84,23 @@ class OutputStreamWithPos extends OutputStream {
 
     @Override
     public void close() throws IOException {
-        outputStream.close();
+        try {
+            outputStream.close();
+            originalStream.close();
+        } catch (IOException e) {
+            getPath().getFileSystem().delete(getPath(), true);
+        }
     }
 
     public long getPos() {
         return pos;
     }
+
+    public Path getPath() {
+        return path;
+    }
+
+    public StreamStateHandle getHandle(BiFunction<Path, Long, StreamStateHandle> handleFactory) {
+        return handleFactory.apply(path, this.pos);
+    }
 }
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 7a672da4ba4..58d142305cd 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -18,6 +18,7 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.util.CloseableIterator;
@@ -45,17 +46,18 @@ import static java.util.Comparator.comparing;
 public class StateChangeFormat {
     private static final Logger LOG = LoggerFactory.getLogger(StateChangeFormat.class);
 
-    Map<StateChangeSet, Long> write(OutputStreamWithPos os, Collection<StateChangeSet> changeSets)
-            throws IOException {
+    Map<StateChangeSet, Tuple2<Long, Long>> write(
+            OutputStreamWithPos os, Collection<StateChangeSet> changeSets) throws IOException {
         List<StateChangeSet> sorted = new ArrayList<>(changeSets);
         // using sorting instead of bucketing for simplicity
         sorted.sort(
                 comparing(StateChangeSet::getLogId)
                         .thenComparing(StateChangeSet::getSequenceNumber));
         DataOutputViewStreamWrapper dataOutput = new DataOutputViewStreamWrapper(os);
-        Map<StateChangeSet, Long> pendingResults = new HashMap<>();
+        Map<StateChangeSet, Tuple2<Long, Long>> pendingResults = new HashMap<>();
         for (StateChangeSet changeSet : sorted) {
-            pendingResults.put(changeSet, os.getPos());
+            long pos = os.getPos();
+            pendingResults.put(changeSet, Tuple2.of(pos, pos));
             writeChangeSet(dataOutput, changeSet.getChanges());
         }
         return pendingResults;
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index 8a37a07f460..e9e9662c65f 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -19,52 +19,29 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.util.clock.Clock;
-import org.apache.flink.util.clock.SystemClock;
-
-import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
 
 /**
  * A synchronous {@link StateChangeUploadScheduler} implementation that uploads the changes using
  * {@link FileSystem}.
  */
-public class StateChangeFsUploader implements StateChangeUploader {
+public class StateChangeFsUploader extends AbstractStateChangeFsUploader {
     private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class);
 
     @VisibleForTesting public static final String PATH_SUB_DIR = "dstl";
 
     private final Path basePath;
     private final FileSystem fileSystem;
-    private final StateChangeFormat format;
-    private final boolean compression;
-    private final int bufferSize;
-    private final ChangelogStorageMetricGroup metrics;
-    private final Clock clock;
-    private final TaskChangelogRegistry changelogRegistry;
-    private final BiFunction<Path, Long, StreamStateHandle> handleFactory;
 
     @VisibleForTesting
     public StateChangeFsUploader(
@@ -95,16 +72,10 @@ public class StateChangeFsUploader implements StateChangeUploader {
             ChangelogStorageMetricGroup metrics,
             TaskChangelogRegistry changelogRegistry,
             BiFunction<Path, Long, StreamStateHandle> handleFactory) {
+        super(compression, bufferSize, metrics, changelogRegistry, handleFactory);
         this.basePath =
                 new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
         this.fileSystem = fileSystem;
-        this.format = new StateChangeFormat();
-        this.compression = compression;
-        this.bufferSize = bufferSize;
-        this.metrics = metrics;
-        this.clock = SystemClock.getInstance();
-        this.changelogRegistry = changelogRegistry;
-        this.handleFactory = handleFactory;
     }
 
     @VisibleForTesting
@@ -112,70 +83,15 @@ public class StateChangeFsUploader implements StateChangeUploader {
         return this.basePath;
     }
 
-    public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
+    @Override
+    public OutputStreamWithPos prepareStream() throws IOException {
         final String fileName = generateFileName();
-        LOG.debug("upload {} tasks to {}", tasks.size(), fileName);
+        LOG.debug("upload tasks to {}", fileName);
         Path path = new Path(basePath, fileName);
-
-        try {
-            return uploadWithMetrics(path, tasks);
-        } catch (IOException e) {
-            metrics.getUploadFailuresCounter().inc();
-            try (Closer closer = Closer.create()) {
-                closer.register(
-                        () -> {
-                            throw e;
-                        });
-                tasks.forEach(cs -> closer.register(() -> cs.fail(e)));
-                closer.register(() -> fileSystem.delete(path, true));
-            }
-        }
-        return null; // closer above throws an exception
-    }
-
-    private UploadTasksResult uploadWithMetrics(Path path, Collection<UploadTask> tasks)
-            throws IOException {
-        metrics.getUploadsCounter().inc();
-        long start = clock.relativeTimeNanos();
-        UploadTasksResult result = upload(path, tasks);
-        metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start);
-        metrics.getUploadSizes().update(result.getStateSize());
-        return result;
-    }
-
-    private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException {
-        try (FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE)) {
-            fsStream.write(compression ? 1 : 0);
-            try (OutputStreamWithPos stream = wrap(fsStream)) {
-                final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets = new HashMap<>();
-                for (UploadTask task : tasks) {
-                    tasksOffsets.put(task, format.write(stream, task.changeSets));
-                }
-                StreamStateHandle handle = handleFactory.apply(path, stream.getPos());
-                changelogRegistry.startTracking(
-                        handle,
-                        tasks.stream()
-                                .flatMap(t -> t.getChangeSets().stream())
-                                .map(StateChangeSet::getLogId)
-                                .collect(Collectors.toSet()));
-                // WARN: streams have to be closed before returning the results
-                // otherwise JM may receive invalid handles
-                return new UploadTasksResult(tasksOffsets, handle);
-            }
-        }
-    }
-
-    private OutputStreamWithPos wrap(FSDataOutputStream fsStream) throws IOException {
-        StreamCompressionDecorator instance =
-                compression
-                        ? SnappyStreamCompressionDecorator.INSTANCE
-                        : UncompressedStreamCompressionDecorator.INSTANCE;
-        return new OutputStreamWithPos(
-                new BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize));
-    }
-
-    private String generateFileName() {
-        return UUID.randomUUID().toString();
+        OutputStreamWithPos outputStream =
+                new OutputStreamWithPos(fileSystem.create(path, WriteMode.NO_OVERWRITE), path);
+        outputStream.wrap(this.compression, this.bufferSize);
+        return outputStream;
     }
 
     @Override
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 6e3b31a8ab3..19f09a151e6 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -88,21 +89,34 @@ public interface StateChangeUploadScheduler extends AutoCloseable {
             JobID jobID,
             ReadableConfig config,
             ChangelogStorageMetricGroup metricGroup,
-            TaskChangelogRegistry changelogRegistry)
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         Path basePath = new Path(config.get(BASE_PATH));
         long bytes = config.get(UPLOAD_BUFFER_SIZE).getBytes();
         checkArgument(bytes <= Integer.MAX_VALUE);
         int bufferSize = (int) bytes;
-        StateChangeFsUploader store =
-                new StateChangeFsUploader(
-                        jobID,
-                        basePath,
-                        basePath.getFileSystem(),
-                        config.get(COMPRESSION_ENABLED),
-                        bufferSize,
-                        metricGroup,
-                        changelogRegistry);
+        StateChangeUploader store =
+                localRecoveryConfig.isLocalRecoveryEnabled()
+                        ? new DuplicatingStateChangeFsUploader(
+                                jobID,
+                                basePath,
+                                basePath.getFileSystem(),
+                                config.get(COMPRESSION_ENABLED),
+                                bufferSize,
+                                metricGroup,
+                                changelogRegistry,
+                                localRecoveryConfig
+                                        .getLocalStateDirectoryProvider()
+                                        .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
+                        : new StateChangeFsUploader(
+                                jobID,
+                                basePath,
+                                basePath.getFileSystem(),
+                                config.get(COMPRESSION_ENABLED),
+                                bufferSize,
+                                metricGroup,
+                                changelogRegistry);
         BatchingStateChangeUploadScheduler batchingStore =
                 new BatchingStateChangeUploadScheduler(
                         config.get(PERSIST_DELAY).toMillis(),
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
index 1f18fe25d8b..537aa7947e5 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
@@ -19,10 +19,13 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -46,27 +49,45 @@ public interface StateChangeUploader extends AutoCloseable {
 
     /** Result of executing one or more {@link UploadTask upload tasks}. */
     final class UploadTasksResult {
-        private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets;
+        private final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets;
         private final StreamStateHandle handle;
+        private final StreamStateHandle localHandle;
+
+        public UploadTasksResult(
+                Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,
+                StreamStateHandle handle) {
+            this(tasksOffsets, handle, null);
+        }
 
         public UploadTasksResult(
-                Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) {
+                Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> tasksOffsets,
+                StreamStateHandle handle,
+                @Nullable StreamStateHandle localHandle) {
             this.tasksOffsets = unmodifiableMap(tasksOffsets);
             this.handle = Preconditions.checkNotNull(handle);
+            this.localHandle = localHandle;
         }
 
         public void complete() {
-            for (Map.Entry<UploadTask, Map<StateChangeSet, Long>> entry : tasksOffsets.entrySet()) {
+            for (Map.Entry<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> entry :
+                    tasksOffsets.entrySet()) {
                 UploadTask task = entry.getKey();
-                Map<StateChangeSet, Long> offsets = entry.getValue();
+                Map<StateChangeSet, Tuple2<Long, Long>> offsets = entry.getValue();
                 task.complete(buildResults(handle, offsets));
             }
         }
 
         private List<UploadResult> buildResults(
-                StreamStateHandle handle, Map<StateChangeSet, Long> offsets) {
+                StreamStateHandle handle, Map<StateChangeSet, Tuple2<Long, Long>> offsets) {
             return offsets.entrySet().stream()
-                    .map(e -> UploadResult.of(handle, e.getKey(), e.getValue()))
+                    .map(
+                            e ->
+                                    UploadResult.of(
+                                            handle,
+                                            localHandle,
+                                            e.getKey(),
+                                            e.getValue().f0,
+                                            e.getValue().f1))
                     .collect(toList());
         }
 
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
index b2caa5c3bd9..385c502b076 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
@@ -21,13 +21,17 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Result of uploading state changes. */
 @Internal
 public final class UploadResult {
     public final StreamStateHandle streamStateHandle;
+    @Nullable public final StreamStateHandle localStreamHandle;
     public final long offset;
+    public final long localOffset;
     public final SequenceNumber sequenceNumber;
     public final long size;
 
@@ -36,24 +40,55 @@ public final class UploadResult {
             long offset,
             SequenceNumber sequenceNumber,
             long size) {
+        this(streamStateHandle, null, offset, offset, sequenceNumber, size);
+    }
+
+    public UploadResult(
+            StreamStateHandle streamStateHandle,
+            @Nullable StreamStateHandle localStreamHandle,
+            long offset,
+            long localOffset,
+            SequenceNumber sequenceNumber,
+            long size) {
         this.streamStateHandle = checkNotNull(streamStateHandle);
+        this.localStreamHandle = localStreamHandle;
         this.offset = offset;
+        this.localOffset = localOffset;
         this.sequenceNumber = checkNotNull(sequenceNumber);
         this.size = size;
     }
 
-    public static UploadResult of(StreamStateHandle handle, StateChangeSet changeSet, long offset) {
-        return new UploadResult(handle, offset, changeSet.getSequenceNumber(), changeSet.getSize());
+    public static UploadResult of(
+            StreamStateHandle handle,
+            StreamStateHandle localHandle,
+            StateChangeSet changeSet,
+            long offset,
+            long localOffset) {
+        return new UploadResult(
+                handle,
+                localHandle,
+                offset,
+                localOffset,
+                changeSet.getSequenceNumber(),
+                changeSet.getSize());
     }
 
     public StreamStateHandle getStreamStateHandle() {
         return streamStateHandle;
     }
 
+    public StreamStateHandle getLocalStreamHandleStateHandle() {
+        return localStreamHandle;
+    }
+
     public long getOffset() {
         return offset;
     }
 
+    public long getLocalOffset() {
+        return localOffset;
+    }
+
     public SequenceNumber getSequenceNumber() {
         return sequenceNumber;
     }
@@ -66,6 +101,8 @@ public final class UploadResult {
     public String toString() {
         return "streamStateHandle="
                 + streamStateHandle
+                + "localStreamHandle"
+                + localStreamHandle
                 + ", size="
                 + size
                 + ", offset="
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 21e6e932f8c..d70c3f63082 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -462,13 +462,13 @@ class BatchingStateChangeUploadSchedulerTest {
             return uploadsCounter.get();
         }
 
-        private Map<UploadTask, Map<StateChangeSet, Long>> withOffsets(
+        private Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> withOffsets(
                 Collection<UploadTask> tasks) {
             return tasks.stream().collect(toMap(identity(), this::withOffsets));
         }
 
-        private Map<StateChangeSet, Long> withOffsets(UploadTask task) {
-            return task.changeSets.stream().collect(toMap(identity(), ign -> 0L));
+        private Map<StateChangeSet, Tuple2<Long, Long>> withOffsets(UploadTask task) {
+            return task.changeSets.stream().collect(toMap(identity(), ign -> Tuple2.of(0L, 0L)));
         }
     }
 
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index 25acff4f606..c86276d0ad7 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.flink.changelog.fs;
  */
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
@@ -26,6 +27,7 @@ import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 
@@ -69,7 +71,8 @@ public class ChangelogStorageMetricsTest {
                         false,
                         100,
                         metrics,
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
@@ -94,7 +97,8 @@ public class ChangelogStorageMetricsTest {
                         false,
                         100,
                         metrics,
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
 
             // upload single byte to infer header size
@@ -127,7 +131,8 @@ public class ChangelogStorageMetricsTest {
                         false,
                         100,
                         metrics,
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
 
             int numUploads = 5;
@@ -178,7 +183,10 @@ public class ChangelogStorageMetricsTest {
 
         FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        batcher, Integer.MAX_VALUE, TaskChangelogRegistry.NO_OP);
+                        batcher,
+                        Integer.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled());
         FsStateChangelogWriter[] writers = new FsStateChangelogWriter[numWriters];
         for (int i = 0; i < numWriters; i++) {
             writers[i] =
@@ -230,7 +238,10 @@ public class ChangelogStorageMetricsTest {
 
         FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        batcher, Integer.MAX_VALUE, TaskChangelogRegistry.NO_OP);
+                        batcher,
+                        Integer.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled());
         FsStateChangelogWriter writer = createWriter(storage);
 
         try {
@@ -272,7 +283,10 @@ public class ChangelogStorageMetricsTest {
 
         FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        batcher, Integer.MAX_VALUE, TaskChangelogRegistry.NO_OP);
+                        batcher,
+                        Integer.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled());
         FsStateChangelogWriter writer = createWriter(storage);
 
         try {
@@ -335,7 +349,11 @@ public class ChangelogStorageMetricsTest {
                                 metrics.getTotalAttemptsPerUpload()),
                         metrics);
         try (FsStateChangelogStorage storage =
-                new FsStateChangelogStorage(batcher, Long.MAX_VALUE, TaskChangelogRegistry.NO_OP)) {
+                new FsStateChangelogStorage(
+                        batcher,
+                        Long.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
             int numUploads = 11;
             for (int i = 0; i < numUploads; i++) {
@@ -360,7 +378,7 @@ public class ChangelogStorageMetricsTest {
 
         @Override
         public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException {
-            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+            Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> map = new HashMap<>();
             for (UploadTask uploadTask : tasks) {
                 int currentAttempt = 1 + attemptsPerTask.getOrDefault(uploadTask, 0);
                 if (currentAttempt == maxAttempts) {
@@ -368,7 +386,10 @@ public class ChangelogStorageMetricsTest {
                     map.put(
                             uploadTask,
                             uploadTask.changeSets.stream()
-                                    .collect(Collectors.toMap(Function.identity(), ign -> 0L)));
+                                    .collect(
+                                            Collectors.toMap(
+                                                    Function.identity(),
+                                                    ign -> Tuple2.of(0L, 0L))));
                 } else {
                     attemptsPerTask.put(uploadTask, currentAttempt);
                     throw new IOException();
@@ -420,12 +441,14 @@ public class ChangelogStorageMetricsTest {
                 }
             }
 
-            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+            Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> map = new HashMap<>();
             for (UploadTask uploadTask : tasks) {
                 map.put(
                         uploadTask,
                         uploadTask.changeSets.stream()
-                                .collect(Collectors.toMap(Function.identity(), ign -> 0L)));
+                                .collect(
+                                        Collectors.toMap(
+                                                Function.identity(), ign -> Tuple2.of(0L, 0L))));
             }
             return new UploadTasksResult(map, new EmptyStreamStateHandle());
         }
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 01b45762f5c..03fc8a9b3e4 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.Bloc
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -57,7 +58,8 @@ public class FsStateChangelogStorageTest
                 compression,
                 1024 * 1024 * 10,
                 createUnregisteredChangelogStorageMetricGroup(),
-                TaskChangelogRegistry.NO_OP);
+                TaskChangelogRegistry.NO_OP,
+                TestLocalRecoveryConfig.disabled());
     }
 
     /**
@@ -103,7 +105,8 @@ public class FsStateChangelogStorageTest
                         new FsStateChangelogStorage(
                                         scheduler,
                                         0,
-                                        TaskChangelogRegistry.NO_OP /* persist immediately */)
+                                        TaskChangelogRegistry.NO_OP, /* persist immediately */
+                                        TestLocalRecoveryConfig.disabled())
                                 .createWriter(
                                         new OperatorID().toString(),
                                         KeyGroupRange.of(0, 0),
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 926f188bb5c..4ea8f665002 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -80,7 +82,9 @@ public class FsStateChangelogWriterSqnTest {
                                 new TestingStateChangeUploader()),
                         Long.MAX_VALUE,
                         new SyncMailboxExecutor(),
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled(),
+                        LocalChangelogRegistry.NO_OP)) {
             if (writerSqnTestSettings.withAppend) {
                 append(writer);
             }
diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index b9e6227cf1d..d2a3aa61f79 100644
--- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.util.function.BiConsumerWithException;
 
@@ -72,12 +75,15 @@ class FsStateChangelogWriterTest {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
-                    CompletableFuture<ChangelogStateHandleStreamImpl> future =
+                    CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
                             writer.persist(append(writer, bytes));
                     assertSubmittedOnly(uploader, bytes);
                     uploader.completeUpload();
                     assertThat(
-                                    getOnlyElement(future.get().getHandlesAndOffsets())
+                                    getOnlyElement(
+                                                    future.get()
+                                                            .getJobManagerOwnedSnapshot()
+                                                            .getHandlesAndOffsets())
                                             .f0
                                             .asBytesIfInMemory()
                                             .get())
@@ -94,7 +100,7 @@ class FsStateChangelogWriterTest {
                     writer.persist(sqn);
                     uploader.completeUpload();
                     uploader.reset();
-                    writer.confirm(sqn, writer.nextSequenceNumber());
+                    writer.confirm(sqn, writer.nextSequenceNumber(), 1L);
                     writer.persist(sqn);
                     assertNoUpload(uploader, "confirmed changes shouldn't be re-uploaded");
                 });
@@ -134,7 +140,7 @@ class FsStateChangelogWriterTest {
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
                     SequenceNumber sqn = append(writer, bytes);
-                    writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE));
+                    writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE), Long.MAX_VALUE);
                     uploader.reset();
                     writer.persist(sqn);
                     assertSubmittedOnly(uploader, bytes);
@@ -149,7 +155,9 @@ class FsStateChangelogWriterTest {
                                         (writer, uploader) -> {
                                             byte[] bytes = getBytes();
                                             SequenceNumber sqn = append(writer, bytes);
-                                            CompletableFuture<ChangelogStateHandleStreamImpl>
+                                            CompletableFuture<
+                                                            SnapshotResult<
+                                                                    ChangelogStateHandleStreamImpl>>
                                                     future = writer.persist(sqn);
                                             uploader.failUpload(new RuntimeException("test"));
                                             try {
@@ -186,7 +194,8 @@ class FsStateChangelogWriterTest {
                     uploader.failUpload(new RuntimeException("test"));
                     uploader.reset();
                     SequenceNumber sqn2 = append(writer, bytes);
-                    CompletableFuture<ChangelogStateHandleStreamImpl> future = writer.persist(sqn2);
+                    CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
+                            writer.persist(sqn2);
                     uploader.completeUpload();
                     future.get();
                 });
@@ -225,7 +234,9 @@ class FsStateChangelogWriterTest {
                         StateChangeUploadScheduler.directScheduler(uploader),
                         appendPersistThreshold,
                         new SyncMailboxExecutor(),
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled(),
+                        LocalChangelogRegistry.NO_OP)) {
             test.accept(writer, uploader);
         }
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
index a814589d45f..23b2b6fd42a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +46,7 @@ import java.util.concurrent.Executor;
 import java.util.function.LongPredicate;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Changelog's implementation of a {@link TaskLocalStateStore}. */
@@ -97,6 +100,21 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
         }
     }
 
+    public static Path getLocalTaskOwnedDirectory(
+            LocalRecoveryDirectoryProvider provider, JobID jobID) {
+        File outDir =
+                provider.selectAllocationBaseDirectory(
+                        (jobID.hashCode() & Integer.MAX_VALUE)
+                                % provider.allocationBaseDirsCount());
+        if (!outDir.exists() && !outDir.mkdirs()) {
+            LOG.error(
+                    "Local state base directory does not exist and could not be created: "
+                            + outDir);
+        }
+        return new Path(
+                String.format("%s/jid_%s", outDir.toURI(), jobID), CHECKPOINT_TASK_OWNED_STATE_DIR);
+    }
+
     @Override
     public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot localState) {
         if (checkpointId < lastCheckpointId) {
@@ -137,13 +155,13 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
 
         discardExecutor.execute(
                 () ->
-                        syncDiscardDirectoryForCollection(
+                        syncDiscardFileForCollection(
                                 materializationToRemove.stream()
                                         .map(super::getCheckpointDirectory)
                                         .collect(Collectors.toList())));
     }
 
-    private void syncDiscardDirectoryForCollection(Collection<File> toDiscard) {
+    private void syncDiscardFileForCollection(Collection<File> toDiscard) {
         for (File directory : toDiscard) {
             if (directory.exists()) {
                 try {
@@ -181,6 +199,17 @@ public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
     @Override
     public CompletableFuture<Void> dispose() {
         deleteMaterialization(id -> true);
+        // delete all ChangelogStateHandle in taskowned directory.
+        discardExecutor.execute(
+                () ->
+                        syncDiscardFileForCollection(
+                                Collections.singleton(
+                                        new File(
+                                                getLocalTaskOwnedDirectory(
+                                                                getLocalRecoveryDirectoryProvider(),
+                                                                jobID)
+                                                        .toUri()))));
+
         synchronized (lock) {
             mapToMaterializationId.clear();
         }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 59c3c090bf3..b0d928f7751 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -89,7 +89,8 @@ public class TaskExecutorStateChangelogStoragesManager {
     public StateChangelogStorage<?> stateChangelogStorageForJob(
             @Nonnull JobID jobId,
             Configuration configuration,
-            TaskManagerJobMetricGroup metricGroup)
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         synchronized (lock) {
             if (closed) {
@@ -103,7 +104,8 @@ public class TaskExecutorStateChangelogStoragesManager {
 
             if (stateChangelogStorage == null) {
                 StateChangelogStorage<?> loaded =
-                        StateChangelogStorageLoader.load(jobId, configuration, metricGroup);
+                        StateChangelogStorageLoader.load(
+                                jobId, configuration, metricGroup, localRecoveryConfig);
                 stateChangelogStorage = Optional.ofNullable(loaded);
                 changelogStoragesByJobId.put(jobId, stateChangelogStorage);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
new file mode 100644
index 00000000000..a2ffb66b84d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+/** This registry is responsible for deleting changlog's local handles which are not in use. */
+@Internal
+public interface LocalChangelogRegistry {
+    LocalChangelogRegistry NO_OP =
+            new LocalChangelogRegistry() {
+                @Override
+                public void register(StreamStateHandle handle, long checkpointID) {}
+
+                @Override
+                public void discardUpToCheckpoint(long upTo) {}
+
+                @Override
+                public void prune(long checkpointID) {}
+            };
+
+    /**
+     * Called upon ChangelogKeyedStateBackend#notifyCheckpointComplete.
+     *
+     * @param handle handle to register.
+     * @param checkpointID latest used checkpointID.
+     */
+    void register(StreamStateHandle handle, long checkpointID);
+
+    /**
+     * Called upon ChangelogKeyedStateBackend#notifyCheckpointComplete and
+     * ChangelogKeyedStateBackend#notifyCheckpointSubsumed. Remote dstl handles are unregistered
+     * when {@link CompletedCheckpointStore#addCheckpointAndSubsumeOldestOne}, local dtsl handles
+     * are unregistered when the checkpoint completes, because only one checkpoint is kept for local
+     * recovery.
+     *
+     * @param upTo lowest CheckpointID which is still valid.
+     */
+    void discardUpToCheckpoint(long upTo);
+
+    /**
+     * Called upon ChangelogKeyedStateBackend#notifyCheckpointAborted.
+     *
+     * @param checkpointID to abort
+     */
+    void prune(long checkpointID);
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
new file mode 100644
index 00000000000..acb8ae318ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.stream.Collectors;
+
+@Internal
+public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalChangelogRegistry.class);
+    /**
+     * All registered handles. (PhysicalStateHandleID, (handle, checkpointID)) represents a handle
+     * and the latest checkpoint that refer to this handle.
+     */
+    private final Map<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>>
+            handleToLastUsedCheckpointID = new ConcurrentHashMap<>();
+
+    /** Executor for async state deletion. */
+    private final Executor asyncDisposalExecutor;
+
+    public LocalChangelogRegistryImpl(Executor ioExecutor) {
+        this.asyncDisposalExecutor = ioExecutor;
+    }
+
+    public void register(StreamStateHandle handle, long checkpointID) {
+        handleToLastUsedCheckpointID.compute(
+                handle.getStreamStateHandleID(),
+                (k, v) -> {
+                    if (v == null) {
+                        return Tuple2.of(handle, checkpointID);
+                    } else {
+                        Preconditions.checkState(handle.equals(v.f0));
+                        return Tuple2.of(handle, Math.max(v.f1, checkpointID));
+                    }
+                });
+    }
+
+    public void discardUpToCheckpoint(long upTo) {
+        List<StreamStateHandle> handles = new ArrayList<>();
+        synchronized (handleToLastUsedCheckpointID) {
+            Iterator<Tuple2<StreamStateHandle, Long>> iterator =
+                    handleToLastUsedCheckpointID.values().iterator();
+            while (iterator.hasNext()) {
+                Tuple2<StreamStateHandle, Long> entry = iterator.next();
+                if (entry.f1 < upTo) {
+                    handles.add(entry.f0);
+                    iterator.remove();
+                }
+            }
+        }
+        for (StreamStateHandle handle : handles) {
+            scheduleAsyncDelete(handle);
+        }
+    }
+
+    public void prune(long checkpointID) {
+        Set<StreamStateHandle> handles =
+                handleToLastUsedCheckpointID.values().stream()
+                        .filter(tuple -> tuple.f1 == checkpointID)
+                        .map(tuple -> tuple.f0)
+                        .collect(Collectors.toSet());
+        for (StreamStateHandle handle : handles) {
+            scheduleAsyncDelete(handle);
+        }
+    }
+
+    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
+        if (streamStateHandle != null) {
+            LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
+            Runnable discardRunner =
+                    () -> {
+                        try {
+                            streamStateHandle.discardState();
+                        } catch (Exception exception) {
+                            LOG.warn(
+                                    "A problem occurred during asynchronous disposal of a stream handle {}.",
+                                    streamStateHandle);
+                        }
+                    };
+            try {
+                asyncDisposalExecutor.execute(discardRunner);
+            } catch (RejectedExecutionException ex) {
+                discardRunner.run();
+            }
+        }
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
index be86a416e6c..37d9091ea30 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 
 import java.io.IOException;
 
@@ -36,7 +37,10 @@ public interface StateChangelogStorageFactory {
 
     /** Create the storage based on a configuration. */
     StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException;
 
     /** Create the storage for recovery. */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 622c09f9956..ee7d39f8a34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
@@ -87,7 +88,10 @@ public class StateChangelogStorageLoader {
 
     @Nullable
     public static StateChangelogStorage<?> load(
-            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         final String identifier =
                 configuration
@@ -100,7 +104,7 @@ public class StateChangelogStorageLoader {
             return null;
         } else {
             LOG.info("Creating a changelog storage with name '{}'.", identifier);
-            return factory.createStorage(jobID, configuration, metricGroup);
+            return factory.createStorage(jobID, configuration, metricGroup, localRecoveryConfig);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 0ee4f615a31..756528139d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.SnapshotResult;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -41,13 +42,13 @@ public interface StateChangelogWriter<Handle extends ChangelogStateHandle> exten
     /**
      * Durably persist previously {@link #append(int, byte[]) appended} data starting from the
      * provided {@link SequenceNumber} and up to the latest change added. After this call, one of
-     * {@link #confirm(SequenceNumber, SequenceNumber) confirm}, {@link #reset(SequenceNumber,
-     * SequenceNumber) reset}, or {@link #truncate(SequenceNumber) truncate} eventually must be
-     * called for the corresponding change set. with reset/truncate/confirm methods?
+     * {@link #confirm(SequenceNumber, SequenceNumber, long) confirm}, {@link #reset(SequenceNumber,
+     * SequenceNumber, long) reset}, or {@link #truncate(SequenceNumber) truncate} eventually must
+     * be called for the corresponding change set. with reset/truncate/confirm methods?
      *
      * @param from inclusive
      */
-    CompletableFuture<Handle> persist(SequenceNumber from) throws IOException;
+    CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from) throws IOException;
 
     /**
      * Truncate this state changelog to free up the resources and collect any garbage. That means:
@@ -71,14 +72,22 @@ public interface StateChangelogWriter<Handle extends ChangelogStateHandle> exten
      *
      * @param from inclusive
      * @param to exclusive
+     * @param checkpointId to confirm
      */
-    void confirm(SequenceNumber from, SequenceNumber to);
+    void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
+
+    /**
+     * Mark the state changes of the given checkpoint as subsumed.
+     *
+     * @param checkpointId
+     */
+    void subsume(long checkpointId);
 
     /**
      * Reset the state the given state changes. Called upon abortion so that if requested later then
      * these changes will be re-uploaded.
      */
-    void reset(SequenceNumber from, SequenceNumber to);
+    void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
 
     /**
      * Truncate the tail of log and close it. No new appends will be possible. Any appended but not
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
index 786a6a7be29..39ca25fde79 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
@@ -36,7 +37,10 @@ public class InMemoryStateChangelogStorageFactory implements StateChangelogStora
 
     @Override
     public StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig) {
         return new InMemoryStateChangelogStorage();
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 3bd0a969d83..eb07ba47716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -75,11 +76,14 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChang
     }
 
     @Override
-    public CompletableFuture<InMemoryChangelogStateHandle> persist(SequenceNumber from) {
+    public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>> persist(
+            SequenceNumber from) {
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
         return completedFuture(
-                new InMemoryChangelogStateHandle(collectChanges(from), from, sqn, keyGroupRange));
+                SnapshotResult.of(
+                        new InMemoryChangelogStateHandle(
+                                collectChanges(from), from, sqn, keyGroupRange)));
     }
 
     private List<StateChange> collectChanges(SequenceNumber after) {
@@ -113,8 +117,11 @@ class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChang
     }
 
     @Override
-    public void confirm(SequenceNumber from, SequenceNumber to) {}
+    public void confirm(SequenceNumber from, SequenceNumber to, long checkpointID) {}
 
     @Override
-    public void reset(SequenceNumber from, SequenceNumber to) {}
+    public void subsume(long checkpointId) {}
+
+    @Override
+    public void reset(SequenceNumber from, SequenceNumber to, long checkpointID) {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 26b274f5a5c..f4481ea9d80 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -700,7 +700,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
             try {
                 changelogStorage =
                         changelogStoragesManager.stateChangelogStorageForJob(
-                                jobId, taskManagerConfiguration.getConfiguration(), jobGroup);
+                                jobId,
+                                taskManagerConfiguration.getConfiguration(),
+                                jobGroup,
+                                localStateStore.getLocalRecoveryConfig());
             } catch (IOException e) {
                 throw new TaskSubmissionException(e);
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
index 8493064f806..abf058986a1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
@@ -131,6 +131,8 @@ public class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTes
         assertTrue(stateSnapshot2.isDiscarded());
         // the materialized part of checkpoint 2 retain, because it still used by checkpoint 3
         assertTrue(checkMaterializedDirExists(2));
+        // checkpoint 1 retain
+        assertEquals(stateSnapshot1, taskLocalStateStore.retrieveLocalState(1));
         assertTrue(checkMaterializedDirExists(1));
         assertEquals(stateSnapshot3, taskLocalStateStore.retrieveLocalState(3));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index b0efaa9c427..1ad0c7b8231 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -53,16 +53,25 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertEquals(storage1, storage2);
 
         JobID jobId2 = new JobID(1L, 2L);
         StateChangelogStorage<?> storage3 =
                 manager.stateChangelogStorageForJob(
-                        jobId2, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId2,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotEquals(storage1, storage3);
         manager.shutdown();
     }
@@ -79,7 +88,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertTrue(storage1 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage1).closed);
         manager.releaseResourcesForJob(jobId1);
@@ -87,7 +99,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
 
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotEquals(storage1, storage2);
 
         manager.shutdown();
@@ -104,7 +119,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNull(storage1);
 
         // change configuration, assert the result not change.
@@ -113,19 +131,28 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
                 StateChangelogOptions.STATE_CHANGE_LOG_STORAGE.defaultValue());
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNull(storage2);
 
         JobID jobId2 = new JobID(1L, 2L);
         StateChangelogStorage<?> storage3 =
                 manager.stateChangelogStorageForJob(
-                        jobId2, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId2,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotNull(storage3);
 
         configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "invalid");
         StateChangelogStorage<?> storage4 =
                 manager.stateChangelogStorageForJob(
-                        jobId2, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId2,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotNull(storage4);
         Assert.assertEquals(storage3, storage4);
 
@@ -144,14 +171,20 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertTrue(storage1 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage1).closed);
 
         JobID jobId2 = new JobID(1L, 2L);
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertTrue(storage2 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage2).closed);
 
@@ -207,7 +240,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
 
         @Override
         public StateChangelogStorage<?> createStorage(
-                JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+                JobID jobID,
+                Configuration configuration,
+                TaskManagerJobMetricGroup metricGroup,
+                LocalRecoveryConfig localRecoveryConfig) {
             return new TestStateChangelogStorage();
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
new file mode 100644
index 00000000000..d28c33e603a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link LocalChangelogRegistryImpl}'s test. */
+public class LocalChangelogRegistryTest extends TestLogger {
+
+    @Test
+    public void testRegistryNormal() {
+        LocalChangelogRegistry localStateRegistry =
+                new LocalChangelogRegistryImpl(Executors.directExecutor());
+        TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
+        TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
+        // checkpoint 1: handle1, handle2
+        localStateRegistry.register(handle1, 1);
+        localStateRegistry.register(handle2, 1);
+
+        // checkpoint 2: handle2, handle3
+        TestingStreamStateHandle handle3 = new TestingStreamStateHandle();
+        localStateRegistry.register(handle2, 2);
+        localStateRegistry.register(handle3, 2);
+
+        localStateRegistry.discardUpToCheckpoint(2);
+        assertTrue(handle1.isDisposed());
+        assertFalse(handle2.isDisposed());
+
+        localStateRegistry.discardUpToCheckpoint(3);
+        assertTrue(handle2.isDisposed());
+        assertTrue(handle3.isDisposed());
+    }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 6c091eeb546..94782f9dda8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -55,7 +57,8 @@ public class StateChangelogStorageLoaderTest {
                 StateChangelogStorageLoader.load(
                         JobID.generate(),
                         new Configuration(),
-                        createUnregisteredTaskManagerJobMetricGroup()));
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled()));
     }
 
     @Test
@@ -66,7 +69,8 @@ public class StateChangelogStorageLoaderTest {
                         JobID.generate(),
                         new Configuration()
                                 .set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "not_exist"),
-                        createUnregisteredTaskManagerJobMetricGroup()));
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled()));
     }
 
     @Test
@@ -79,7 +83,8 @@ public class StateChangelogStorageLoaderTest {
                 StateChangelogStorageLoader.load(
                         JobID.generate(),
                         new Configuration(),
-                        createUnregisteredTaskManagerJobMetricGroup());
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         assertTrue(loaded instanceof TestStateChangelogStorage);
     }
 
@@ -120,7 +125,10 @@ public class StateChangelogStorageLoaderTest {
 
         @Override
         public StateChangelogStorage<?> createStorage(
-                JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup) {
+                JobID jobID,
+                Configuration configuration,
+                TaskManagerJobMetricGroup metricGroup,
+                LocalRecoveryConfig localRecoveryConfig) {
             return new TestStateChangelogStorage();
         }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index a130ba79723..f0c92ab55ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
@@ -97,10 +98,10 @@ public class StateChangelogStorageTest<T extends ChangelogStateHandle> {
                 writer.nextSequenceNumber();
             }
 
-            T handle = writer.persist(prev).get();
+            SnapshotResult<T> res = writer.persist(prev).get();
+            T jmHandle = res.getJobManagerOwnedSnapshot();
             StateChangelogHandleReader<T> reader = client.createReader();
-
-            assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
+            assertByteMapsEqual(appendsByKeyGroup, extract(jmHandle, reader));
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
index 0739c96e271..60d282da8f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
@@ -52,12 +52,15 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.After;
@@ -65,7 +68,9 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collections;
@@ -108,6 +113,8 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
     public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource =
             new TestingFatalErrorHandlerResource();
 
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
     @Before
     public void setup() {
         haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -136,13 +143,18 @@ public class TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
         final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
         final TestingResourceManagerGateway testingResourceManagerGateway =
                 setupResourceManagerGateway(initialSlotReportFuture);
-
+        final TaskExecutorLocalStateStoresManager localStateStoresManager =
+                new TaskExecutorLocalStateStoresManager(
+                        false,
+                        Reference.owned(new File[] {tmp.newFolder()}),
+                        Executors.directExecutor());
         final TaskManagerServices taskManagerServices =
                 new TaskManagerServicesBuilder()
                         .setTaskSlotTable(
                                 TaskSlotUtils.createTaskSlotTable(
                                         1, timeout, EXECUTOR_RESOURCE.getExecutor()))
                         .setShuffleEnvironment(new NettyShuffleEnvironmentBuilder().build())
+                        .setTaskStateManager(localStateStoresManager)
                         .build();
 
         final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index f91f9e72b37..35f099b4215 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -51,14 +52,18 @@ import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.FunctionUtils;
 
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -84,6 +89,8 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
     public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource =
             new TestingFatalErrorHandlerResource();
 
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
     @Before
     public void setup() {
         UserClassLoaderExtractingInvokable.clearQueue();
@@ -221,6 +228,7 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
                                 TaskSlotUtils.createTaskSlotTable(
                                         1, EXECUTOR_RESOURCE.getExecutor()))
                         .setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
+                        .setTaskStateManager(createTaskExecutorLocalStateStoresManager())
                         .build(),
                 ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                 new TestingHeartbeatServices(),
@@ -231,6 +239,12 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
                 new TestingTaskExecutorPartitionTracker());
     }
 
+    private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager()
+            throws IOException {
+        return new TaskExecutorLocalStateStoresManager(
+                false, Reference.owned(new File[] {tmp.newFolder()}), Executors.directExecutor());
+    }
+
     public static final class UserClassLoaderExtractingInvokable extends AbstractInvokable {
 
         private static BlockingQueue<ClassLoader> userCodeClassLoaders =
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index eb5792dc23a..9c7fe9d57bb 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -417,22 +417,34 @@ public class ChangelogKeyedStateBackend<K>
 
     private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult(
             long checkpointId,
-            ChangelogStateHandle delta,
+            SnapshotResult<? extends ChangelogStateHandle> delta,
             ChangelogSnapshotState changelogStateBackendStateCopy) {
 
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
         long persistedSizeOfThisCheckpoint = 0L;
-        if (delta != null && delta.getStateSize() > 0) {
-            prevDeltaCopy.add(delta);
-            persistedSizeOfThisCheckpoint += delta.getCheckpointedSize();
+        if (delta != null
+                && delta.getJobManagerOwnedSnapshot() != null
+                && delta.getJobManagerOwnedSnapshot().getStateSize() > 0) {
+            prevDeltaCopy.add(delta.getJobManagerOwnedSnapshot());
+            persistedSizeOfThisCheckpoint +=
+                    delta.getJobManagerOwnedSnapshot().getCheckpointedSize();
         }
 
         if (prevDeltaCopy.isEmpty()
                 && changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
-        } else if (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) {
+        } else if (!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()
+                || delta.getTaskLocalSnapshot() != null) {
+            List<ChangelogStateHandle> localDeltaCopy =
+                    new ArrayList<>(
+                            changelogStateBackendStateCopy.getLocalRestoredNonMaterialized());
+            if (delta != null
+                    && delta.getTaskLocalSnapshot() != null
+                    && delta.getTaskLocalSnapshot().getStateSize() > 0) {
+                localDeltaCopy.add(delta.getTaskLocalSnapshot());
+            }
             ChangelogStateBackendHandleImpl jmHandle =
                     new ChangelogStateBackendHandleImpl(
                             changelogStateBackendStateCopy.getMaterializedSnapshot(),
@@ -445,7 +457,7 @@ public class ChangelogKeyedStateBackend<K>
                     jmHandle,
                     new ChangelogStateBackendLocalHandle(
                             changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
-                            prevDeltaCopy,
+                            localDeltaCopy,
                             jmHandle));
         } else {
             return SnapshotResult.of(
@@ -540,7 +552,7 @@ public class ChangelogKeyedStateBackend<K>
             // newer upload instead of the previous one. This newer upload could then be re-used
             // while in fact JM has discarded its results.
             // This might change if the log ownership changes (the method won't likely be needed).
-            stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo);
+            stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo, checkpointId);
         }
         Long materializationID = materializationIdByCheckpointId.remove(checkpointId);
         if (materializationID != null) {
@@ -559,7 +571,7 @@ public class ChangelogKeyedStateBackend<K>
             // change if it is not relevant anymore. Otherwise, it could DISCARD a newer upload
             // instead of the previous one. Rely on truncation for the cleanup in this case.
             // This might change if the log ownership changes (the method won't likely be needed).
-            stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
+            stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo, checkpointId);
         }
         // TODO: Consider notifying nested state backend about checkpoint abortion (FLINK-25850)
     }
diff --git a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
index 6a6b71868b5..12b5d3388f2 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
@@ -70,6 +70,7 @@ class ChangelogTruncateHelper {
             subsumedUpTo = sqn;
             checkpointedUpTo.headMap(checkpointId, true).clear();
             truncate();
+            stateChangelogWriter.subsume(checkpointId);
         }
     }
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index 6a3a41bccd1..c26a736ddb9 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -190,7 +191,8 @@ public class ChangelogStateBackendTestUtils {
                                 new ChangelogStorageMetricGroup(
                                         UnregisteredMetricGroups
                                                 .createUnregisteredTaskManagerJobMetricGroup()),
-                                TaskChangelogRegistry.NO_OP))
+                                TaskChangelogRegistry.NO_OP,
+                                TestLocalRecoveryConfig.disabled()))
                 .build();
     }
 
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
index 26855819f1e..1e69afe2f9b 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.FsStateChangelogStorage;
 import org.apache.flink.changelog.fs.StateChangeSet;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
@@ -39,6 +40,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -128,7 +130,8 @@ public class ChangelogStateDiscardTest {
                 TaskChangelogRegistry.defaultChangelogRegistry(directExecutor());
         final TestingUploadScheduler scheduler = new TestingUploadScheduler(registry);
         singleBackendTest(
-                new FsStateChangelogStorage(scheduler, 0L, registry),
+                new FsStateChangelogStorage(
+                        scheduler, 0L, registry, TestLocalRecoveryConfig.disabled()),
                 (backend, writer) -> {
                     changeAndLogRandomState(backend, scheduler.uploads::size);
                     truncate(writer, backend);
@@ -185,7 +188,8 @@ public class ChangelogStateDiscardTest {
                 TaskChangelogRegistry.defaultChangelogRegistry(directExecutor());
         final TestingUploadScheduler scheduler = new TestingUploadScheduler(registry);
         final StateChangelogStorage<?> storage =
-                new FsStateChangelogStorage(scheduler, 0, registry);
+                new FsStateChangelogStorage(
+                        scheduler, 0, registry, TestLocalRecoveryConfig.disabled());
         final StateChangelogWriter<?>
                 w1 = storage.createWriter("test-operator-1", kgRange, new SyncMailboxExecutor()),
                 w2 = storage.createWriter("test-operator-2", kgRange, new SyncMailboxExecutor());
@@ -222,7 +226,10 @@ public class ChangelogStateDiscardTest {
         long preEmptivePersistThresholdInBytes = 0L; // flush ASAP
         singleBackendTest(
                 new FsStateChangelogStorage(
-                        directScheduler(uploader), preEmptivePersistThresholdInBytes, registry),
+                        directScheduler(uploader),
+                        preEmptivePersistThresholdInBytes,
+                        registry,
+                        TestLocalRecoveryConfig.disabled()),
                 (backend, writer) -> testCase.accept(backend, writer, uploader));
     }
 
@@ -367,14 +374,15 @@ public class ChangelogStateDiscardTest {
             results.add(handle);
             // todo: avoid making StateChangeSet and its internals public?
             // todo: make the contract more explicit or extract common code
-            Map<UploadTask, Map<StateChangeSet, Long>> taskOffsets =
+            Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> taskOffsets =
                     tasks.stream().collect(toMap(identity(), this::mapOffsets));
             tasks.forEach(task -> startTracking(registry, handle, task));
             return new UploadTasksResult(taskOffsets, handle);
         }
 
-        private Map<StateChangeSet, Long> mapOffsets(UploadTask task) {
-            return task.getChangeSets().stream().collect(Collectors.toMap(identity(), ign -> 0L));
+        private Map<StateChangeSet, Tuple2<Long, Long>> mapOffsets(UploadTask task) {
+            return task.getChangeSets().stream()
+                    .collect(Collectors.toMap(identity(), ign -> Tuple2.of(0L, 0L)));
         }
 
         @Override
diff --git a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index 70604262496..9813409475f 100644
--- a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -128,10 +128,13 @@ abstract class StateChangeLoggerTestBase<Namespace> {
         public void truncate(SequenceNumber to) {}
 
         @Override
-        public void confirm(SequenceNumber from, SequenceNumber to) {}
+        public void confirm(SequenceNumber from, SequenceNumber to, long checkpointId) {}
 
         @Override
-        public void reset(SequenceNumber from, SequenceNumber to) {}
+        public void subsume(long checkpointId) {}
+
+        @Override
+        public void reset(SequenceNumber from, SequenceNumber to, long checkpointId) {}
 
         @Override
         public void truncateAndClose(SequenceNumber from) {}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
index 1f770de6066..3b0693cd3ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
@@ -150,11 +150,9 @@ public class ChangelogLocalRecoveryITCase extends TestLogger {
                 () ->
                         miniCluster
                                 .getExecutionGraph(firstJobGraph.getJobID())
-                                .get(10000, TimeUnit.SECONDS),
+                                .get(500, TimeUnit.SECONDS),
                 false);
-
-        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false);
-        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID());
     }
 
     private StreamExecutionEnvironment getEnv(