You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/21 19:13:19 UTC

[GitHub] [flink] rkhachatryan opened a new pull request, #19550: [FLINK-25512][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

rkhachatryan opened a new pull request, #19550:
URL: https://github.com/apache/flink/pull/19550

   ## What is the purpose of the change
   
   Discard pre-emptively uploaded state changes not included into any checkpoint.
   
   ## Verifying this change
   
   - `ChangelogRegistryImplTest`
   - `ChangelogStateDiscardTest`
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1112249471

   Thanks a lot for the thorough review @Myasuka,
   I'll squash the commits and merge the PR once the build completes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
curcur commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1223813657

   That one is a bit different, it is clean-up JM-owned files shared files.
   The proposed solution won't solve this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858733870


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+import java.util.UUID;
+
+/**
+ * Unique ID that allows for physical comparison between state handles.
+ *
+ * <p>Different state objects (e.g. different files) representing the same piece of data must have
+ * different IDs (e.g. file names). This is different from {@link
+ * org.apache.flink.runtime.state.KeyedStateHandle#getStateHandleId} which returns the same ID.
+ *
+ * @see StateHandleID
+ */
+public class PhysicalStateHandleID extends StringBasedID {
+
+    private static final long serialVersionUID = 1L;
+
+    public PhysicalStateHandleID(String keyString) {
+        super(keyString);
+    }
+
+    public static PhysicalStateHandleID randomStateHandleId() {

Review Comment:
   No, I'll remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r860811183


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   I think the test below should also be verified:
   ~~~java
   changeAndLogRandomState(backend, uploader.results::size);
   checkpoint(backend, 1L);
   materialize(backend, writer);
   assertRetained(uploader.results); // may be used in non-subsumed checkpoints
   ~~~
   
   Even we call the `materialize` after `checkpoint`, since we did not receive checkpoint complete message, the uploader results are still there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858718454


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   I'd rather initialize it by default - that would be more simpe, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan merged pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan merged PR #19550:
URL: https://github.com/apache/flink/pull/19550


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858803313


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {

Review Comment:
   I agree, this info is missing.
   But maybe even more informative would be `TaskChangelogRegistry`. That would highlight that the ownership is either on JM or on TM. 
   
   WDYT?
   (`UnconfirmedChaneglogRegistry` is also fine to me)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r859607072


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   I think the deserialization also happens during checkpoint recover, and that would also be fine. Thus, I think it's okay to initialize the filed directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858823012


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+class ChangelogTruncateHelper {

Review Comment:
   Good point, I'll add the javadoc.
   The reason for adding a separate class was to avoid adding more responsibilites to the `ChangelogKeyedStateBackend`.
   Indeed, the it's the only usage, so I think it doesn't make sense to make the class/API public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r859362305


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   If current sequense number is up to `100`, we then call `changelogTruncateHelper.materialized(upTo)` after materialization finished. Thus, we would discard change logs with `upTo` as `100`, however, if the `stopTracking` is missed before, can we say the discarding is safe here? Considering the next checkpoints are not successful and thus the materialized parts are not be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
curcur commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1223806998

   OK, so it will remain leftovers after failover.
   
   Who will be responsible for deleting it? In the task-owned folder? 
   
   Will the task-owned folder be deleted after stop a job?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1223798218

   @curcur Yuan, no, it's not persisted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858603609


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   Can we move the physicalStateHandleId as a private field? It will be `null` by default, and initialize on the 1st access. This is the same for `ByteStreamStateHandle`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+import java.util.UUID;
+
+/**
+ * Unique ID that allows for physical comparison between state handles.
+ *
+ * <p>Different state objects (e.g. different files) representing the same piece of data must have
+ * different IDs (e.g. file names). This is different from {@link
+ * org.apache.flink.runtime.state.KeyedStateHandle#getStateHandleId} which returns the same ID.
+ *
+ * @see StateHandleID
+ */
+public class PhysicalStateHandleID extends StringBasedID {
+
+    private static final long serialVersionUID = 1L;
+
+    public PhysicalStateHandleID(String keyString) {
+        super(keyString);
+    }
+
+    public static PhysicalStateHandleID randomStateHandleId() {

Review Comment:
   Is this method ever used?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+class ChangelogTruncateHelper {

Review Comment:
   Can we add some docs for this class and related methods? To be honest, with this class introduced, we can have simple public API. However, we lack the information when and why these methods are called. As these methods are only called within `ChangelogKeyedStateBackend`, maybe we just put them back to `ChangelogKeyedStateBackend` could achieve clear logic for others to understand.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   The notification of checkpoint complete cannot alaways be guranteed, is it always safe here to call this method considering `stopTracking` missed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java:
##########
@@ -73,12 +80,19 @@
      */
     void reset(SequenceNumber from, SequenceNumber to);
 
+    /**
+     * Truncate the tail of log and close it. No new appends will be possible. Any appended but not
+     * persisted records will be lost.
+     *
+     * @param from {@link SequenceNumber} from which to truncate the changelog, inclusive
+     */
+    default void truncateAndClose(SequenceNumber from) {

Review Comment:
   The `StateChangelogWriter` class is an internal class, do we still need to introduce a default interface here?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {

Review Comment:
   Actually, `ChangelogRegistry` is only used for changelogs which has not been confirmed in checkpoints. However, current name and API lacks of such information.
   How about rename this class to `UnconfirmedChaneglogRegistry`.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {
+        PhysicalStateHandleID key = handle.getStreamStateHandleID();
+        LOG.debug("backend {} not using state, key: {}, state: {}", backendId, key, handle);
+        Set<UUID> backends = entries.get(key);
+        if (backends == null) {
+            LOG.warn("backend {} was not using state, key: {}, state: {}", backendId, key, handle);

Review Comment:
   What's the difference of this log statement compared with the above debug statement?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {
+
+    /** Start tracking the state uploaded for the given backends. */
+    void startTracking(StreamStateHandle handle, Set<UUID> backendIDs);
+
+    /** Stop tracking the state, so that it's not tracked (some other component is doing that). */
+    void stopTracking(StreamStateHandle handle);
+
+    /**
+     * Mark the state as unused by the given backend, e.g. if it was pre-emptively uploaded and
+     * materialized. Once no backend is using the state, it is discarded (unless it was {@link
+     * #stopTracking(StreamStateHandle) unregistered} earlier).
+     */
+    void notUsed(StreamStateHandle handle, UUID backendId);
+
+    ChangelogRegistry NO_OP =
+            new ChangelogRegistry() {
+                @Override
+                public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {}
+
+                @Override
+                public void stopTracking(StreamStateHandle handle) {}
+
+                @Override
+                public void notUsed(StreamStateHandle handle, UUID backendId) {}
+            };
+
+    static ChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads) {
+        return defaultChangelogRegistry(Executors.newFixedThreadPool(numAsyncDiscardThreads));
+    }
+
+    static ChangelogRegistry defaultChangelogRegistry(Executor executor) {

Review Comment:
   Should add `@VisibleForTesting`.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##########
@@ -270,7 +286,25 @@ public void truncate(SequenceNumber to) {
         checkArgument(to.compareTo(activeSequenceNumber) <= 0);
         lowestSequenceNumber = to;
         notUploaded.headMap(lowestSequenceNumber, false).clear();
-        uploaded.headMap(lowestSequenceNumber, false).clear();
+
+        Map<SequenceNumber, UploadResult> toDiscard = uploaded.headMap(to);
+        LOG.trace("Uploaded state to discard: {}", toDiscard);
+        for (UploadResult result : toDiscard.values()) {
+            changelogRegistry.notUsed(result.streamStateHandle, logId);
+        }
+        toDiscard.clear();

Review Comment:
   We can introduce a method to be reused in `truncate` and `truncateAndClose`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858799446


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   Good question.
   
   It is the client responsibility to call `notUsed` when it is actually safe to do so. This is achieved by calling `notUsed` on checkpoint **subsumption** notification. And when a checkpoint is subsumed, notification about its **completion** doesn't matter.
   
   And when there are several backends involved, it is the responsibility of the Registry to wait for `notUsed()` call from **all** backends before discarding the state (and not discard it if at least one called `stopTracking()`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r860918146


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   Added `testPreEmptiveUploadNotDiscardedWithoutNotification` in 5ea7f3650ee.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r859355932


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {

Review Comment:
   I think `TaskChangelogRegistry`, and we should give explict descriptions that the ownership is **neither** on JM or on TM.
   
   



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {

Review Comment:
   I think `TaskChangelogRegistry` is also fine, and we should give explict descriptions that the ownership is **neither** on JM or on TM.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r860637638


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   Isn't this covered in `testPreEmptiveUploadDiscardedOnSubsumption`:
   ```
   materialize(backend, writer);
   checkpoint(backend, 1L);
   assertRetained(uploader.results); // may be used in non-subsumed checkpoints
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r859349613


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   In general, when we add a new field in a Serializable class, we need to consider the NPE when deserializing previous existing objects. That's why I suggest to make it `null` by default. I checked the place to deserialize `FileStateHandle`, it should be fine here to initialize it directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] curcur commented on pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
curcur commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1223795084

   @rkhachatryan Hey Roman, wondering whether `ChangelogRegistry` is persisted (so that can be recovered after a failover).
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19550: [FLINK-25512][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1105658573

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0a5c99b00a3308cbe030b8ee43ed627a06c4f295",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0a5c99b00a3308cbe030b8ee43ed627a06c4f295",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0a5c99b00a3308cbe030b8ee43ed627a06c4f295 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on PR #19550:
URL: https://github.com/apache/flink/pull/19550#issuecomment-1223812399

   If a TM crashes during an upload or while running, some state might remain.
   In all such cases, the user is responsible to clean up the leftover state.
   There is a ticket FLINK-24852 to address this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r859586131


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   I think deserialization only happens when passing handles between TM and JM - and there it's fine, the field will not be null.
   Is that what you mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r859630417


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   > we then call changelogTruncateHelper.materialized(upTo) after materialization finished. 
   > Thus, we would discard change logs with upTo as 100
   
   Let's assume there is some valid or pending checkpoint that uses sqn <= 100. But JM completion notification was lost, and therefore `stopTracking` was not called.
   
   But if that checkpoint is valid/pending, it couldn't be subsumed (nor any newer checkpoint).
   Therefore, `ChangelogTruncateHelper.subsumedUpTo` hasn't been advanced yet.
   Therefore, discard will not happen at this point because `ChangelogTruncateHelper` truncates the logs up to `min(materializedUpTo, subsumedUpTo)`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r860430325


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   I see, and I think this is valid. However, this guarantee seems not tested in `ChangelogStateDiscardTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] rkhachatryan commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Posted by GitBox <gi...@apache.org>.
rkhachatryan commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858732789


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {
+        PhysicalStateHandleID key = handle.getStreamStateHandleID();
+        LOG.debug("backend {} not using state, key: {}, state: {}", backendId, key, handle);
+        Set<UUID> backends = entries.get(key);
+        if (backends == null) {
+            LOG.warn("backend {} was not using state, key: {}, state: {}", backendId, key, handle);

Review Comment:
   In the 2nd case, the state is **already** not tracked for the given backend.
   As this is a normal case, I'll remove the log message (this might happen after some other backend calls `stopTracking`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org