You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/26 12:16:53 UTC

[GitHub] [flink] Myasuka commented on a diff in pull request #19550: [FLINK-25511][state/changelog] Discard pre-emptively uploaded state changes not included into any checkpoint

Myasuka commented on code in PR #19550:
URL: https://github.com/apache/flink/pull/19550#discussion_r858603609


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java:
##########
@@ -73,6 +74,11 @@ public Optional<byte[]> asBytesIfInMemory() {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(filePath.toUri().toString());

Review Comment:
   Can we move the physicalStateHandleId as a private field? It will be `null` by default, and initialize on the 1st access. This is the same for `ByteStreamStateHandle`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+import java.util.UUID;
+
+/**
+ * Unique ID that allows for physical comparison between state handles.
+ *
+ * <p>Different state objects (e.g. different files) representing the same piece of data must have
+ * different IDs (e.g. file names). This is different from {@link
+ * org.apache.flink.runtime.state.KeyedStateHandle#getStateHandleId} which returns the same ID.
+ *
+ * @see StateHandleID
+ */
+public class PhysicalStateHandleID extends StringBasedID {
+
+    private static final long serialVersionUID = 1L;
+
+    public PhysicalStateHandleID(String keyString) {
+        super(keyString);
+    }
+
+    public static PhysicalStateHandleID randomStateHandleId() {

Review Comment:
   Is this method ever used?



##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+class ChangelogTruncateHelper {

Review Comment:
   Can we add some docs for this class and related methods? To be honest, with this class introduced, we can have simple public API. However, we lack the information when and why these methods are called. As these methods are only called within `ChangelogKeyedStateBackend`, maybe we just put them back to `ChangelogKeyedStateBackend` could achieve clear logic for others to understand.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {

Review Comment:
   The notification of checkpoint complete cannot alaways be guranteed, is it always safe here to call this method considering `stopTracking` missed?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java:
##########
@@ -73,12 +80,19 @@
      */
     void reset(SequenceNumber from, SequenceNumber to);
 
+    /**
+     * Truncate the tail of log and close it. No new appends will be possible. Any appended but not
+     * persisted records will be lost.
+     *
+     * @param from {@link SequenceNumber} from which to truncate the changelog, inclusive
+     */
+    default void truncateAndClose(SequenceNumber from) {

Review Comment:
   The `StateChangelogWriter` class is an internal class, do we still need to introduce a default interface here?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {

Review Comment:
   Actually, `ChangelogRegistry` is only used for changelogs which has not been confirmed in checkpoints. However, current name and API lacks of such information.
   How about rename this class to `UnconfirmedChaneglogRegistry`.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistryImpl.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executor;
+
+@Internal
+@ThreadSafe
+class ChangelogRegistryImpl implements ChangelogRegistry {
+    private static final Logger LOG = LoggerFactory.getLogger(ChangelogRegistryImpl.class);
+
+    private final Map<PhysicalStateHandleID, Set<UUID>> entries = new ConcurrentHashMap<>();
+    private final Executor executor;
+
+    public ChangelogRegistryImpl(Executor executor) {
+        this.executor = executor;
+    }
+
+    @Override
+    public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {
+        LOG.debug(
+                "start tracking state, key: {}, state: {}",
+                handle.getStreamStateHandleID(),
+                handle);
+        entries.put(handle.getStreamStateHandleID(), new CopyOnWriteArraySet<>(backendIDs));
+    }
+
+    @Override
+    public void stopTracking(StreamStateHandle handle) {
+        LOG.debug(
+                "stop tracking state, key: {}, state: {}", handle.getStreamStateHandleID(), handle);
+        entries.remove(handle.getStreamStateHandleID());
+    }
+
+    @Override
+    public void notUsed(StreamStateHandle handle, UUID backendId) {
+        PhysicalStateHandleID key = handle.getStreamStateHandleID();
+        LOG.debug("backend {} not using state, key: {}, state: {}", backendId, key, handle);
+        Set<UUID> backends = entries.get(key);
+        if (backends == null) {
+            LOG.warn("backend {} was not using state, key: {}, state: {}", backendId, key, handle);

Review Comment:
   What's the difference of this log statement compared with the above debug statement?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/ChangelogRegistry.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/**
+ * Registry of changelog segments uploaded by {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogWriter StateChangelogWriters} of a {@link
+ * org.apache.flink.runtime.state.changelog.StateChangelogStorage StateChangelogStorage}.
+ */
+@Internal
+public interface ChangelogRegistry {
+
+    /** Start tracking the state uploaded for the given backends. */
+    void startTracking(StreamStateHandle handle, Set<UUID> backendIDs);
+
+    /** Stop tracking the state, so that it's not tracked (some other component is doing that). */
+    void stopTracking(StreamStateHandle handle);
+
+    /**
+     * Mark the state as unused by the given backend, e.g. if it was pre-emptively uploaded and
+     * materialized. Once no backend is using the state, it is discarded (unless it was {@link
+     * #stopTracking(StreamStateHandle) unregistered} earlier).
+     */
+    void notUsed(StreamStateHandle handle, UUID backendId);
+
+    ChangelogRegistry NO_OP =
+            new ChangelogRegistry() {
+                @Override
+                public void startTracking(StreamStateHandle handle, Set<UUID> backendIDs) {}
+
+                @Override
+                public void stopTracking(StreamStateHandle handle) {}
+
+                @Override
+                public void notUsed(StreamStateHandle handle, UUID backendId) {}
+            };
+
+    static ChangelogRegistry defaultChangelogRegistry(int numAsyncDiscardThreads) {
+        return defaultChangelogRegistry(Executors.newFixedThreadPool(numAsyncDiscardThreads));
+    }
+
+    static ChangelogRegistry defaultChangelogRegistry(Executor executor) {

Review Comment:
   Should add `@VisibleForTesting`.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##########
@@ -270,7 +286,25 @@ public void truncate(SequenceNumber to) {
         checkArgument(to.compareTo(activeSequenceNumber) <= 0);
         lowestSequenceNumber = to;
         notUploaded.headMap(lowestSequenceNumber, false).clear();
-        uploaded.headMap(lowestSequenceNumber, false).clear();
+
+        Map<SequenceNumber, UploadResult> toDiscard = uploaded.headMap(to);
+        LOG.trace("Uploaded state to discard: {}", toDiscard);
+        for (UploadResult result : toDiscard.values()) {
+            changelogRegistry.notUsed(result.streamStateHandle, logId);
+        }
+        toDiscard.clear();

Review Comment:
   We can introduce a method to be reused in `truncate` and `truncateAndClose`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org