You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/13 00:33:58 UTC

[GitHub] [flink] rkhachatryan commented on a diff in pull request #20217: [FLINK-27693][state] Support local recovery for non-materialized part

rkhachatryan commented on code in PR #20217:
URL: https://github.com/apache/flink/pull/20217#discussion_r919521374


##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The
+ * difference is that this stream does not delete the file when {@link #close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {

Review Comment:
   Is this class necessary? Can't the existing `DuplicatingCheckpointOutputStream` be used instead?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The
+ * difference is that this stream does not delete the file when {@link #close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {
+    private static final Logger LOG = LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class);
+
+    private final OutputStream secondaryStream;
+
+    public DuplicatingOutputStreamWithPos(OutputStream primaryStream, OutputStream secondaryStream)
+            throws IOException {
+        super(primaryStream);
+        this.secondaryStream = Preconditions.checkNotNull(secondaryStream);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        outputStream.write(b);
+        try {
+            secondaryStream.write(b);
+        } catch (Exception ex) {
+            LOG.warn("Exception encountered during write to secondary stream");
+        }

Review Comment:
   if this class remains:  think such error handling might result in data loss on recovery.
   
   ditt: other methods



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##########
@@ -19,50 +19,30 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.util.clock.Clock;
-import org.apache.flink.util.clock.SystemClock;
 
 import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
 
 /**
  * A synchronous {@link StateChangeUploadScheduler} implementation that uploads the changes using
  * {@link FileSystem}.
  */
-public class StateChangeFsUploader implements StateChangeUploader {
+public class StateChangeFsUploader extends AbstractStateChangeFsUploader {

Review Comment:
   `AbstractStateChangeFsUploader` was extracted and is extended by `DuplicatingStateChangeFsUploader` and `StateChangeFsUploader`.
   It has some `protected` methods that are used by descendants; and no `abstract` methods. And the calling methods in descendants (`upload`) have some duplication.
   
   That means there is something wrong with the design.
   How about applying template method pattern and having `upload()` in the base class and some `abstract` methods to customize it (i.e. create stream and handle errors)?
   
   (alternatively, the two classes can be just implementers; code can be shared via composition or just static methods)
   
   WDYT?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR;
+
+/** A StateChangeFsUploader implementation that writes the changes to remote and local. */
+public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DuplicatingStateChangeFsUploader.class);
+
+    private final Path basePath;
+    private final FileSystem fileSystem;
+    private LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider;

Review Comment:
   Can be final.



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##########
@@ -329,14 +343,28 @@ public void confirm(SequenceNumber from, SequenceNumber to) {
         uploaded.subMap(from, to).values().stream()
                 .map(UploadResult::getStreamStateHandle)
                 .forEach(changelogRegistry::stopTracking);
+
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            LocalStateRegistry localStateRegistry =
+                    localRecoveryConfig
+                            .getLocalStateRegistry()
+                            .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
+
+            // transfer the control of localHandle to localStateRegistry.
+            for (UploadResult result : uploaded.subMap(from, to).values()) {
+                changelogRegistry.stopTracking(result.localStreamHandle);
+                localStateRegistry.register(result.localStreamHandle, checkpointId);

Review Comment:
   Null check for `result.localStreamHandle`?
   



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into both streams. The
+ * difference is that this stream does not delete the file when {@link #close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {
+    private static final Logger LOG = LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class);
+
+    private final OutputStream secondaryStream;
+
+    public DuplicatingOutputStreamWithPos(OutputStream primaryStream, OutputStream secondaryStream)
+            throws IOException {
+        super(primaryStream);
+        this.secondaryStream = Preconditions.checkNotNull(secondaryStream);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        outputStream.write(b);
+        try {
+            secondaryStream.write(b);
+        } catch (Exception ex) {
+            LOG.warn("Exception encountered during write to secondary stream");
+        }
+        pos++;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        outputStream.write(b);
+        try {
+            secondaryStream.write(b);
+        } catch (Exception ex) {
+            LOG.warn("Exception encountered during write to secondary stream");
+        }
+        pos += b.length;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        outputStream.write(b, off, len);
+        try {
+            secondaryStream.write(b, off, len);
+        } catch (Exception ex) {
+            LOG.warn("Exception encountered during writing to secondary stream");
+        }
+        pos += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        outputStream.flush();
+        try {
+            secondaryStream.flush();
+        } catch (Exception ex) {
+            LOG.warn("Exception encountered during flushing secondary stream");
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        outputStream.close();
+        secondaryStream.close();

Review Comment:
   if this class remains:  shouldn't both `close` be wrapped with `try`, or `Closer`?



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java:
##########
@@ -21,12 +21,15 @@
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Result of uploading state changes. */
 @Internal
 public final class UploadResult {
     public final StreamStateHandle streamStateHandle;
+    public final StreamStateHandle localStreamHandle;

Review Comment:
   Mark `@Nullable`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateHandleID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+public class ChangelogStateBackendLocalHandle implements ChangelogStateBackendHandle {
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            LoggerFactory.getLogger(ChangelogStateBackendLocalHandle.class);
+    private final List<KeyedStateHandle> localMaterialized;
+    private final List<ChangelogStateHandle> localNonMaterialized;
+    private final ChangelogStateBackendHandleImpl remoteHandle;
+
+    public ChangelogStateBackendLocalHandle(
+            List<KeyedStateHandle> localMaterialized,
+            List<ChangelogStateHandle> localNonMaterialized,
+            ChangelogStateBackendHandleImpl remoteHandle) {
+        this.localMaterialized = localMaterialized;
+        this.localNonMaterialized = localNonMaterialized;
+        this.remoteHandle = remoteHandle;
+    }
+
+    @Override
+    public List<KeyedStateHandle> getMaterializedStateHandles() {
+        return localMaterialized;
+    }
+
+    @Override
+    public List<ChangelogStateHandle> getNonMaterializedStateHandles() {
+        return localNonMaterialized;
+    }
+
+    @Override
+    public long getMaterializationID() {
+        return remoteHandle.getMaterializationID();
+    }
+
+    @Override
+    public ChangelogStateBackendHandle rebound(long checkpointId) {
+        return remoteHandle.rebound(checkpointId);
+    }
+
+    public List<KeyedStateHandle> getRemoteMaterializedStateHandles() {
+        return remoteHandle.getMaterializedStateHandles();
+    }
+
+    public List<ChangelogStateHandle> getRemoteNonMaterializedStateHandles() {
+        return remoteHandle.getNonMaterializedStateHandles();
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return remoteHandle.getCheckpointId();
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointID) {
+        remoteHandle.registerSharedStates(stateRegistry, checkpointID);
+    }
+
+    @Override
+    public long getCheckpointedSize() {
+        return remoteHandle.getCheckpointedSize();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return remoteHandle.getKeyGroupRange();
+    }
+
+    @Nullable
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        throw new UnsupportedOperationException(
+                "This is a local state handle for the TM side only.");
+    }
+
+    @Override
+    public StateHandleID getStateHandleId() {
+        return remoteHandle.getStateHandleId();
+    }
+
+    @Override
+    public void discardState() throws Exception {}

Review Comment:
   :+1: IIUC, the discard logic is correct:
   1. `(Changelog)TaskLocalStateStore` calls this no-op method
   1. `LocalStateRegistry` calls `discardState()` on "low-level" handle (`FileStateHandle`) - on checkpoint confirmation
   1. `TaskChangelogRegistry` - calls `discardState()` on "low-level" handle (`FileStateHandle`) - when upload is no more needed
   1. As opposed to "normal" local state, local changelog state is *not* discarded on recovery (because of 1); but it will be discarded on the 1st checkpoint confirmation
   
   Can you confirm that?
   
   Although I found it tricky to understand, so it makes sense to document it somewhere, WDYT?
   
   (and also to cover with tests).



##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java:
##########
@@ -329,14 +343,28 @@ public void confirm(SequenceNumber from, SequenceNumber to) {
         uploaded.subMap(from, to).values().stream()
                 .map(UploadResult::getStreamStateHandle)
                 .forEach(changelogRegistry::stopTracking);
+
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            LocalStateRegistry localStateRegistry =
+                    localRecoveryConfig
+                            .getLocalStateRegistry()
+                            .orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
+
+            // transfer the control of localHandle to localStateRegistry.
+            for (UploadResult result : uploaded.subMap(from, to).values()) {
+                changelogRegistry.stopTracking(result.localStreamHandle);
+                localStateRegistry.register(result.localStreamHandle, checkpointId);
+            }
+            localStateRegistry.unRegister(checkpointId);

Review Comment:
   Just to clarify: we only retain local state for the last checkpoint (num retained checkpoint is ignored). Same as the current local recovery, right?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateRegistry.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** This registry manages handles which is written for local recovery. */
+public class LocalStateRegistry {

Review Comment:
   I'm thinking about the case when local changelog state explodes and causes `no space left on device`.
   The host might become unresponsive and it will be difficult to debug such an issue.
   
   To ease the debug, we should probably monitor the space taken by the changelog; and stop writing locally or throw an exception when it takes too much (above some configured limit).
   This class seems the best place for such monitoring.
   
   This can be a separate ticket though, at least if local recovery is not the default.
   
   WDYT?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org