You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/04 09:25:48 UTC

[GitHub] [flink] 1996fanrui opened a new pull request, #20151: [FLINK-26803][checkpoint] Merging channel state files

1996fanrui opened a new pull request, #20151:
URL: https://github.com/apache/flink/pull/20151

   ## What is the purpose of the change
   
   Merging channel state files to reduce the pressure on DFS.
   
   
   ## Brief change log
   
     - *Introduce the SharedChannelStateCheckpointStreamManager to control the number of channel state files.*
   
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added the SharedChannelStateCheckpointStreamManagerTest.*
     - *Added the PendingChannelStateOutputStreamTest.*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper:  no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented?  docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082023420


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   This is code about `lock.wait()`, if we add the `lock.notifyAll()`, the `lock.wait()` will return directly. And the `request` is still null, `lock.wait` will be blocked again. So the `lock.notifyAll()` doesn't work. That's why I think the `thread.interrupt()` is enough here.
   
   ```
       private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException {
           ChannelStateWriteRequest request;
           while (true) {
               request = deque.pollFirst();
               if (request == null) {
                   lock.wait();
               } else {
                   return request;
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1053286712


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   Thanks for your suggestion, I have updated.
   
   I didn't call the `lock.notifyAll()` during close due to we have called the `thread.interrupt()`. Please correct me if i'm wrong, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dawidwys commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
dawidwys commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082525394


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Flag to enable approximate local recovery.");
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of subtasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each subtask will create a new channel state file when this is configured to 1.");

Review Comment:
   1. Does it apply to unaligned checkpoints only? If so, could we put it under `unaligned` subgroup? `execution.checkpointing.unaligned.(...)`
   2. Do we plan to have more options under `channel-state`? If not, I would not introduce this subgroup.
   3. I like @pnowojski idea more, but I'd add `max` prefix: `max-subtasks-per-file`
   
   Depending on 2. I'd go with either `channel-state.max-subtasks-per-file` or ` max-subtasks-per-channel-state-file`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1052418421


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   > BlockingDeque.take() will wait until an element becomes available. Deque is hard to achieve. BlockingDeque is easy to implement the producer & consumer model.
   
   That's a slight complexity, but should be easily solved via `lock.wait()` and `lock.notifyAll()` called in one or two places (`close()` and whenever we add anything to the current `dequeue`)? https://howtodoinjava.com/java/multi-threading/wait-notify-and-notifyall-methods/
   
   The loop probably should look like this:
   
   ```
   while (true) {
     synchronized (lock) {
       if (wasClosed) return; 
       (...)
       ChannelStateWriteRequest request = waitAndTakeUnsafe();
       (...)
     }
   }
   
   private ChannelStateWriteRequest waitAndTakeUnsafe() {
     ChannelStateWriteRequest request;
     while (true) {
       request = dequeue.pollFirst();
       if (request == null) {
         lock.wait();
       }
       else {
         return request;
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1048286114


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -18,280 +18,265 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.InputChannelStateHandle;
-import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Objects;
+import java.util.Set;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.UUID.randomUUID;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Writes channel state for a specific checkpoint-subtask-attempt triple. */
+/** Writes channel state for multiple subtasks of the same checkpoint. */
 @NotThreadSafe
 class ChannelStateCheckpointWriter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
 
     private final DataOutputStream dataStream;
     private final CheckpointStateOutputStream checkpointStream;
-    private final ChannelStateWriteResult result;
-    private final Map<InputChannelInfo, StateContentMetaInfo> inputChannelOffsets = new HashMap<>();
-    private final Map<ResultSubpartitionInfo, StateContentMetaInfo> resultSubpartitionOffsets =
-            new HashMap<>();
+
+    /**
+     * Indicates whether the current checkpoints of all subtasks have exception. If it's not null,
+     * the checkpoint will fail.
+     */
+    private Throwable throwable;
+
     private final ChannelStateSerializer serializer;
     private final long checkpointId;
-    private boolean allInputsReceived = false;
-    private boolean allOutputsReceived = false;
     private final RunnableWithException onComplete;
-    private final int subtaskIndex;
-    private final String taskName;
+
+    // Subtasks that have not yet register writer result.
+    private final Set<SubtaskID> waitedSubtasks;
+
+    private final Map<SubtaskID, ChannelStatePendingResult> pendingResults = new HashMap<>();
 
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStartRequest startCheckpointItem,
+            Set<SubtaskID> subtasks,
+            long checkpointId,
             CheckpointStreamFactory streamFactory,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete)
             throws Exception {
         this(
-                taskName,
-                subtaskIndex,
-                startCheckpointItem.getCheckpointId(),
-                startCheckpointItem.getTargetResult(),
+                subtasks,
+                checkpointId,
                 streamFactory.createCheckpointStateOutputStream(EXCLUSIVE),
                 serializer,
                 onComplete);
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             CheckpointStateOutputStream stream,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete) {
-        this(
-                taskName,
-                subtaskIndex,
-                checkpointId,
-                result,
-                serializer,
-                onComplete,
-                stream,
-                new DataOutputStream(stream));
+        this(subtasks, checkpointId, serializer, onComplete, stream, new DataOutputStream(stream));
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete,
             CheckpointStateOutputStream checkpointStateOutputStream,
             DataOutputStream dataStream) {
-        this.taskName = taskName;
-        this.subtaskIndex = subtaskIndex;
+        checkArgument(!subtasks.isEmpty(), "The subtasks cannot be empty.");
+        this.waitedSubtasks = new HashSet<>(subtasks);

Review Comment:
   `waitedSubtasks` -> `subtasksToRegister`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStatePendingResult.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.channel;
+
+import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
+import org.apache.flink.runtime.state.InputChannelStateHandle;
+import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.UUID.randomUUID;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The pending result of channel state for a specific checkpoint-subtask. */
+public class ChannelStatePendingResult {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ChannelStatePendingResult.class);
+
+    // Subtask information
+    private final int subtaskIndex;
+
+    private final long checkpointId;
+
+    // Result related
+    private final ChannelStateSerializer serializer;
+    private final ChannelStateWriter.ChannelStateWriteResult result;
+    private final Map<InputChannelInfo, AbstractChannelStateHandle.StateContentMetaInfo>
+            inputChannelOffsets = new HashMap<>();
+    private final Map<ResultSubpartitionInfo, AbstractChannelStateHandle.StateContentMetaInfo>
+            resultSubpartitionOffsets = new HashMap<>();
+    private boolean allInputsReceived = false;
+    private boolean allOutputsReceived = false;
+
+    public ChannelStatePendingResult(
+            int subtaskIndex,
+            long checkpointId,
+            ChannelStateWriter.ChannelStateWriteResult result,
+            ChannelStateSerializer serializer) {
+        this.subtaskIndex = subtaskIndex;
+        this.checkpointId = checkpointId;
+        this.result = result;
+        this.serializer = serializer;
+    }
+
+    public boolean isAllInputsReceived() {
+        return allInputsReceived;
+    }
+
+    public boolean isAllOutputsReceived() {
+        return allOutputsReceived;
+    }
+
+    public Map<InputChannelInfo, StateContentMetaInfo> getInputChannelOffsets() {
+        return inputChannelOffsets;
+    }
+
+    public Map<ResultSubpartitionInfo, StateContentMetaInfo> getResultSubpartitionOffsets() {
+        return resultSubpartitionOffsets;
+    }
+
+    void completeInput() {
+        LOG.debug("complete input, output completed: {}", allOutputsReceived);
+        checkArgument(!allInputsReceived);
+        allInputsReceived = true;
+    }
+
+    void completeOutput() {
+        LOG.debug("complete output, input completed: {}", allInputsReceived);
+        checkArgument(!allOutputsReceived);
+        allOutputsReceived = true;
+    }
+
+    public void finishResult(StreamStateHandle stateHandle) throws IOException {
+        if (inputChannelOffsets.isEmpty() && resultSubpartitionOffsets.isEmpty()) {
+            result.inputChannelStateHandles.complete(emptyList());
+            result.resultSubpartitionStateHandles.complete(emptyList());
+            return;
+        }

Review Comment:
   Does it make sense to optimise this code case? I think without this if check, the `complete()` calls below would still work the same way?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -315,18 +325,73 @@ public void fail(Throwable e) {
         }
     }
 
-    private interface HandleFactory<I, H extends AbstractChannelStateHandle<I>> {
-        H create(
-                int subtaskIndex,
-                I info,
-                StreamStateHandle underlying,
-                List<Long> offsets,
-                long size);
+    @Nonnull
+    private ChannelStatePendingResult getChannelStatePendingResult(
+            JobID jobID, JobVertexID jobVertexID, int subtaskIndex) {
+        SubtaskID subtaskID = SubtaskID.of(jobID, jobVertexID, subtaskIndex);
+        ChannelStatePendingResult pendingResult = pendingResults.get(subtaskID);
+        checkNotNull(pendingResult, "The subtask[%s] is not registered yet", subtaskID);
+        return pendingResult;
+    }
+}
+
+/** A identification for subtask. */
+class SubtaskID {
+
+    private final JobID jobID;

Review Comment:
   Why do we even need the `JobID` here? All of the write requests should be coming from the same job.
   
   And if we really need it, why do we have to pass `JobID` for every `writeInput`/`writeOutput` call? Should it be passed through the constructor? Now it suggests that single writer has to handle writes from different jobs, which I hope is not the case 😅 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -90,33 +103,62 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     }
 
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
+        if (request instanceof SubtaskRegisterRequest) {
+            SubtaskRegisterRequest req = (SubtaskRegisterRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), req.getSubtaskIndex());
+            subtasks.add(subtaskID);

Review Comment:
   `subtasks` -> `registeredSubtasks`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   Haven't we discussed somewhere, that this (and the `dequeue` above) could be replaced with   non thread safe versions and a single explicit lock, instead of having many different thread safe primitives used? Or was it for a different class 🤔? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -18,280 +18,265 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.InputChannelStateHandle;
-import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Objects;
+import java.util.Set;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.UUID.randomUUID;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Writes channel state for a specific checkpoint-subtask-attempt triple. */
+/** Writes channel state for multiple subtasks of the same checkpoint. */
 @NotThreadSafe
 class ChannelStateCheckpointWriter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
 
     private final DataOutputStream dataStream;
     private final CheckpointStateOutputStream checkpointStream;
-    private final ChannelStateWriteResult result;
-    private final Map<InputChannelInfo, StateContentMetaInfo> inputChannelOffsets = new HashMap<>();
-    private final Map<ResultSubpartitionInfo, StateContentMetaInfo> resultSubpartitionOffsets =
-            new HashMap<>();
+
+    /**
+     * Indicates whether the current checkpoints of all subtasks have exception. If it's not null,
+     * the checkpoint will fail.
+     */
+    private Throwable throwable;
+
     private final ChannelStateSerializer serializer;
     private final long checkpointId;
-    private boolean allInputsReceived = false;
-    private boolean allOutputsReceived = false;
     private final RunnableWithException onComplete;
-    private final int subtaskIndex;
-    private final String taskName;
+
+    // Subtasks that have not yet register writer result.
+    private final Set<SubtaskID> waitedSubtasks;
+
+    private final Map<SubtaskID, ChannelStatePendingResult> pendingResults = new HashMap<>();
 
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStartRequest startCheckpointItem,
+            Set<SubtaskID> subtasks,
+            long checkpointId,
             CheckpointStreamFactory streamFactory,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete)
             throws Exception {
         this(
-                taskName,
-                subtaskIndex,
-                startCheckpointItem.getCheckpointId(),
-                startCheckpointItem.getTargetResult(),
+                subtasks,
+                checkpointId,
                 streamFactory.createCheckpointStateOutputStream(EXCLUSIVE),
                 serializer,
                 onComplete);
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             CheckpointStateOutputStream stream,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete) {
-        this(
-                taskName,
-                subtaskIndex,
-                checkpointId,
-                result,
-                serializer,
-                onComplete,
-                stream,
-                new DataOutputStream(stream));
+        this(subtasks, checkpointId, serializer, onComplete, stream, new DataOutputStream(stream));
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete,
             CheckpointStateOutputStream checkpointStateOutputStream,
             DataOutputStream dataStream) {
-        this.taskName = taskName;
-        this.subtaskIndex = subtaskIndex;
+        checkArgument(!subtasks.isEmpty(), "The subtasks cannot be empty.");
+        this.waitedSubtasks = new HashSet<>(subtasks);
         this.checkpointId = checkpointId;
-        this.result = checkNotNull(result);
         this.checkpointStream = checkNotNull(checkpointStateOutputStream);
         this.serializer = checkNotNull(serializer);
         this.dataStream = checkNotNull(dataStream);
         this.onComplete = checkNotNull(onComplete);
         runWithChecks(() -> serializer.writeHeader(dataStream));
     }
 
-    void writeInput(InputChannelInfo info, Buffer buffer) {
-        write(
-                inputChannelOffsets,
-                info,
-                buffer,
-                !allInputsReceived,
-                "ChannelStateCheckpointWriter#writeInput");
+    void registerSubtaskResult(
+            SubtaskID subtaskID, ChannelStateWriter.ChannelStateWriteResult result) {
+        // The writer shouldn't register any subtask after writer has exception or is done,
+        checkState(!isDone(), "The write is done.");
+        Preconditions.checkState(
+                !pendingResults.containsKey(subtaskID),
+                "The subtask %s has already been register before.",
+                subtaskID);
+        waitedSubtasks.remove(subtaskID);
+
+        ChannelStatePendingResult pendingResult =
+                new ChannelStatePendingResult(
+                        subtaskID.getSubtaskIndex(), checkpointId, result, serializer);
+        pendingResults.put(subtaskID, pendingResult);
     }
 
-    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
-        write(
-                resultSubpartitionOffsets,
-                info,
-                buffer,
-                !allOutputsReceived,
-                "ChannelStateCheckpointWriter#writeOutput");
+    void releaseSubtask(SubtaskID subtaskID) throws Exception {
+        if (waitedSubtasks.remove(subtaskID)) {
+            tryFinishResult();

Review Comment:
   Why do we have to call `tryFinishResult` on release?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -90,33 +103,62 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     }
 
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {

Review Comment:
   This method has grown a bit too much. Could you split into sth like:
   
   ```
   if (isAbortedCheckpoint(...)) {
     handleAbortedRequest(...);
   }
   else if (request instanceof X) {
     handleRequestX(....);
   }
   (....)
   ```
   ?
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -41,69 +46,123 @@
 import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 interface ChannelStateWriteRequest {
 
     Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequest.class);
 
+    JobID getJobID();
+
+    JobVertexID getJobVertexID();
+
+    int getSubtaskIndex();
+
     long getCheckpointId();
 
     void cancel(Throwable cause) throws Exception;
 
-    static CheckpointInProgressRequest completeInput(long checkpointId) {
+    CompletableFuture<?> getReadyFuture();
+
+    static CheckpointInProgressRequest completeInput(
+            JobID jobID, JobVertexID jobVertexID, int subtaskIndex, long checkpointId) {

Review Comment:
   ditto about the need for JobID here



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -172,25 +257,80 @@ static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] flinkBuffers) {
     }
 }
 
-final class CheckpointStartRequest implements ChannelStateWriteRequest {
+abstract class AbstractChannelStateWriteRequest implements ChannelStateWriteRequest {
+
+    private final JobID jobID;
+
+    private final JobVertexID jobVertexID;
+
+    private final int subtaskIndex;
+
+    private final long checkpointId;
+
+    public AbstractChannelStateWriteRequest(
+            JobID jobID, JobVertexID jobVertexID, int subtaskIndex, long checkpointId) {
+        this.jobID = jobID;
+        this.jobVertexID = jobVertexID;
+        this.subtaskIndex = subtaskIndex;
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public final JobID getJobID() {
+        return jobID;
+    }
+
+    @Override
+    public final JobVertexID getJobVertexID() {
+        return jobVertexID;
+    }
+
+    @Override
+    public final int getSubtaskIndex() {
+        return subtaskIndex;
+    }
+
+    @Override
+    public final long getCheckpointId() {
+        return checkpointId;
+    }
+
+    @Override
+    public CompletableFuture<?> getReadyFuture() {
+        return AvailabilityProvider.AVAILABLE;
+    }
+
+    @Override
+    public String toString() {

Review Comment:
   If you are adding already this super method, why don't you add the "name" parameter here as well? This way there would be no need for the subclasses to override this toString method



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -172,25 +257,80 @@ static ThrowingConsumer<Throwable, Exception> recycle(Buffer[] flinkBuffers) {
     }
 }
 
-final class CheckpointStartRequest implements ChannelStateWriteRequest {
+abstract class AbstractChannelStateWriteRequest implements ChannelStateWriteRequest {

Review Comment:
   why do we need the interface and the abstract class? Can now we just change the `ChannelStateWriteRequest` interface into the abstract class?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -41,69 +46,123 @@
 import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 interface ChannelStateWriteRequest {
 
     Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequest.class);
 
+    JobID getJobID();
+
+    JobVertexID getJobVertexID();
+
+    int getSubtaskIndex();
+
     long getCheckpointId();
 
     void cancel(Throwable cause) throws Exception;
 
-    static CheckpointInProgressRequest completeInput(long checkpointId) {
+    CompletableFuture<?> getReadyFuture();

Review Comment:
   can you add a javadoc explaining what is this method doing/is used for?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -152,14 +200,23 @@ private void failAndClearWriter(Throwable e) {
         writer = null;
     }
 
+    private void failAndClearWriter(
+            JobID jobID, JobVertexID jobVertexID, int subtaskIndex, Throwable throwable) {
+        if (writer == null) {
+            return;
+        }
+        writer.fail(jobID, jobVertexID, subtaskIndex, throwable);
+        writer = null;
+    }

Review Comment:
   Why do we have two `failAndClearWriter` methods that are behaving differently? One is failing the `pendingResult`  the other is not? 🤔 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.channel;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.CheckpointStorage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** The factory of ChannelStateWriteRequestExecutor. */
+public class ChannelStateWriteRequestExecutorFactory {
+
+    private static final Map<JobID, ChannelStateWriteRequestExecutor> EXECUTORS = new HashMap<>();

Review Comment:
   Instead of having a singleton, with static fields, can we inject a shared instance of `ChannelStateWriteRequestExecutorFactory`?  For example initialize and hold this instance in `TaskManagerServices`, and pass it to `Task` and into the `StreamTask` through `Environment` and later down into `SubtaskCheckpointCoordinatorImpl#openChannelStateWriter`
   
   (such kind of static/global variables can cause many problems down the line, including in for example tests)  



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -90,33 +103,62 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     }
 
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
+        if (request instanceof SubtaskRegisterRequest) {
+            SubtaskRegisterRequest req = (SubtaskRegisterRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), req.getSubtaskIndex());
+            subtasks.add(subtaskID);
+            return;
+        } else if (request instanceof SubtaskReleaseRequest) {
+            SubtaskReleaseRequest req = (SubtaskReleaseRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), req.getSubtaskIndex());
+            subtasks.remove(subtaskID);
+            if (writer == null) {
+                return;
+            }
+            writer.releaseSubtask(subtaskID);
+            return;
+        }
         if (isAbortedCheckpoint(request.getCheckpointId())) {
-            if (request.getCheckpointId() == maxAbortedCheckpointId) {
+            if (request.getCheckpointId() != maxAbortedCheckpointId) {
+                request.cancel(new CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+                return;
+            }
+
+            SubtaskID requestSubtask =
+                    SubtaskID.of(
+                            request.getJobID(),
+                            request.getJobVertexID(),
+                            request.getSubtaskIndex());
+            if (requestSubtask.equals(abortedSubtaskID)) {
                 request.cancel(abortedCause);
             } else {
-                request.cancel(new CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+                request.cancel(
+                        new CheckpointException(
+                                CHANNEL_STATE_SHARED_STREAM_EXCEPTION, abortedCause));
             }
             return;
         }
 
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    request.getCheckpointId() > ongoingCheckpointId,
+                    request.getCheckpointId() >= ongoingCheckpointId,
                     String.format(
                             "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
                             ongoingCheckpointId, request));
-            failAndClearWriter(
-                    new IllegalStateException(
-                            String.format(
-                                    "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
-                                            + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
-                                            + "a bug due to currently not supported concurrent unaligned checkpoint.",
-                                    taskName,
-                                    subtaskIndex,
-                                    ongoingCheckpointId,
-                                    request.getCheckpointId())));
-            this.writer = buildWriter((CheckpointStartRequest) request);
-            this.ongoingCheckpointId = request.getCheckpointId();
+            if (request.getCheckpointId() > ongoingCheckpointId) {
+                // Clear the previous writer.
+                failAndClearWriter(new CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+            }
+            CheckpointStartRequest req = (CheckpointStartRequest) request;
+            if (writer == null) {

Review Comment:
   This can not be `checkState(writer == null)` because single dispatcher will handle 5 `CheckpointStartRequests` from 5 subtasks (assuming 5 subtasks are configured to share the same file?).
   
   If so, maybe add a comment explaining this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1173585785

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "fb29b1e59124cd2571e61dc1081199b46d2585f9",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "fb29b1e59124cd2571e61dc1081199b46d2585f9",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * fb29b1e59124cd2571e61dc1081199b46d2585f9 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1049658956


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -18,280 +18,265 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.InputChannelStateHandle;
-import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Objects;
+import java.util.Set;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.UUID.randomUUID;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Writes channel state for a specific checkpoint-subtask-attempt triple. */
+/** Writes channel state for multiple subtasks of the same checkpoint. */
 @NotThreadSafe
 class ChannelStateCheckpointWriter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
 
     private final DataOutputStream dataStream;
     private final CheckpointStateOutputStream checkpointStream;
-    private final ChannelStateWriteResult result;
-    private final Map<InputChannelInfo, StateContentMetaInfo> inputChannelOffsets = new HashMap<>();
-    private final Map<ResultSubpartitionInfo, StateContentMetaInfo> resultSubpartitionOffsets =
-            new HashMap<>();
+
+    /**
+     * Indicates whether the current checkpoints of all subtasks have exception. If it's not null,
+     * the checkpoint will fail.
+     */
+    private Throwable throwable;
+
     private final ChannelStateSerializer serializer;
     private final long checkpointId;
-    private boolean allInputsReceived = false;
-    private boolean allOutputsReceived = false;
     private final RunnableWithException onComplete;
-    private final int subtaskIndex;
-    private final String taskName;
+
+    // Subtasks that have not yet register writer result.
+    private final Set<SubtaskID> waitedSubtasks;
+
+    private final Map<SubtaskID, ChannelStatePendingResult> pendingResults = new HashMap<>();
 
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStartRequest startCheckpointItem,
+            Set<SubtaskID> subtasks,
+            long checkpointId,
             CheckpointStreamFactory streamFactory,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete)
             throws Exception {
         this(
-                taskName,
-                subtaskIndex,
-                startCheckpointItem.getCheckpointId(),
-                startCheckpointItem.getTargetResult(),
+                subtasks,
+                checkpointId,
                 streamFactory.createCheckpointStateOutputStream(EXCLUSIVE),
                 serializer,
                 onComplete);
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             CheckpointStateOutputStream stream,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete) {
-        this(
-                taskName,
-                subtaskIndex,
-                checkpointId,
-                result,
-                serializer,
-                onComplete,
-                stream,
-                new DataOutputStream(stream));
+        this(subtasks, checkpointId, serializer, onComplete, stream, new DataOutputStream(stream));
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete,
             CheckpointStateOutputStream checkpointStateOutputStream,
             DataOutputStream dataStream) {
-        this.taskName = taskName;
-        this.subtaskIndex = subtaskIndex;
+        checkArgument(!subtasks.isEmpty(), "The subtasks cannot be empty.");
+        this.waitedSubtasks = new HashSet<>(subtasks);
         this.checkpointId = checkpointId;
-        this.result = checkNotNull(result);
         this.checkpointStream = checkNotNull(checkpointStateOutputStream);
         this.serializer = checkNotNull(serializer);
         this.dataStream = checkNotNull(dataStream);
         this.onComplete = checkNotNull(onComplete);
         runWithChecks(() -> serializer.writeHeader(dataStream));
     }
 
-    void writeInput(InputChannelInfo info, Buffer buffer) {
-        write(
-                inputChannelOffsets,
-                info,
-                buffer,
-                !allInputsReceived,
-                "ChannelStateCheckpointWriter#writeInput");
+    void registerSubtaskResult(
+            SubtaskID subtaskID, ChannelStateWriter.ChannelStateWriteResult result) {
+        // The writer shouldn't register any subtask after writer has exception or is done,
+        checkState(!isDone(), "The write is done.");
+        Preconditions.checkState(
+                !pendingResults.containsKey(subtaskID),
+                "The subtask %s has already been register before.",
+                subtaskID);
+        waitedSubtasks.remove(subtaskID);
+
+        ChannelStatePendingResult pendingResult =
+                new ChannelStatePendingResult(
+                        subtaskID.getSubtaskIndex(), checkpointId, result, serializer);
+        pendingResults.put(subtaskID, pendingResult);
     }
 
-    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
-        write(
-                resultSubpartitionOffsets,
-                info,
-                buffer,
-                !allOutputsReceived,
-                "ChannelStateCheckpointWriter#writeOutput");
+    void releaseSubtask(SubtaskID subtaskID) throws Exception {
+        if (waitedSubtasks.remove(subtaskID)) {
+            tryFinishResult();

Review Comment:
   `releaseSubtask` will be called when the subtask is finished or canceled. Technically, it will be called when `SubtaskCheckpointCoordinatorImpl#cancel` is called.
   
   If the subtask is finished:
   - writer shouldn't wait the subtask, so call the `waitedSubtasks.remove(subtaskID)`
   - If all checkpoint of other subtasks of this writer are completed, and writer is waiting for the last subtask. After the last subtask is finished, the writer should be completed. That's why call `tryFinishResult` here.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1049851231


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   Sorry, you mentioned it before. 
   
   However, I encountered some difficulties during I refactor it:
   
   - `BlockingDeque.take()` will  wait until an element becomes available. `Deque` is hard to achieve. `BlockingDeque` is easy to implement the `producer & consumer` model.
   - The `loop` method access these fields continuously, do you have any suggestions to  refactor them using the lock?
   
   
   ```
   private void loop() throws Exception {
       while (!wasClosed) {
           try {
               ChannelStateWriteRequest request = deque.take();
               // The executor will end the registration, when the start request comes.
               // Because the checkpoint can be started after all tasks are initiated.
               if (request instanceof CheckpointStartRequest && isRegistering()) {
                   isRegistering.set(false);
               }
               dispatcher.dispatch(request);
           } catch (InterruptedException e) {
               if (!wasClosed) {
                   LOG.debug(
                           "Channel state executor is interrupted while waiting for a request (continue waiting)",
                           e);
               } else {
                   Thread.currentThread().interrupt();
               }
           }
       }
   }
   ```
   
   Sorry for not letting you know this before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1356350601

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082600527


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Flag to enable approximate local recovery.");
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of subtasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each subtask will create a new channel state file when this is configured to 1.");

Review Comment:
   It's only for unaligned checkpoints. There is a possibility we will have more options, but not very likely. So maybe `execution.checkpointing.unaligned.max-subtasks-per-channel-state-file`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082365656


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Flag to enable approximate local recovery.");
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of subtasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each subtask will create a new channel state file when this is configured to 1.");

Review Comment:
   > number-of-subtasks-share-file
   
   This sounds a bit strange in english. Maybe let's rename it to:
   
   > execution.checkpointing.channel-state.subtasks-per-file
   
   ? and renaming the config option and getters as well? @dawidwys maybe you have some better idea?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082043383


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -107,49 +119,78 @@ void run() {
             try {
                 closeAll(
                         this::cleanupRequests,
-                        () ->
-                                dispatcher.fail(
-                                        thrown == null ? new CancellationException() : thrown));
+                        () -> {
+                            Throwable cause;
+                            synchronized (lock) {
+                                cause = thrown == null ? new CancellationException() : thrown;
+                            }
+                            dispatcher.fail(cause);
+                        });
             } catch (Exception e) {
-                //noinspection NonAtomicOperationOnVolatileField
-                thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                synchronized (lock) {
+                    //noinspection NonAtomicOperationOnVolatileField
+                    thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                }
             }
             FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
         }
         LOG.debug("loop terminated");
     }
 
     private void loop() throws Exception {
-        while (!wasClosed) {
+        while (true) {
             try {
-                ChannelStateWriteRequest request = deque.take();
-                // The executor will end the registration, when the start request comes.
-                // Because the checkpoint can be started after all tasks are initiated.
-                if (request instanceof CheckpointStartRequest && isRegistering()) {
-                    checkState(
-                            isRegistering.compareAndSet(true, false),
-                            "Transition isRegistering failed.");
+                ChannelStateWriteRequest request;
+                boolean completeRegister = false;
+                synchronized (lock) {
+                    if (wasClosed) {
+                        return;
+                    }
+                    request = waitAndTakeUnsafe();
+                    // The executor will end the registration, when the start request comes.
+                    // Because the checkpoint can be started after all tasks are initiated.
+                    if (request instanceof CheckpointStartRequest) {
+                        completeRegister = completeRegister();
+                    }
+                }
+                if (completeRegister) {
                     onRegistered.accept(this);
                 }
                 dispatcher.dispatch(request);
             } catch (InterruptedException e) {
-                if (!wasClosed) {
-                    LOG.debug(
-                            "Channel state executor is interrupted while waiting for a request (continue waiting)",
-                            e);
-                } else {
-                    Thread.currentThread().interrupt();
+                synchronized (lock) {
+                    if (!wasClosed) {
+                        LOG.debug(
+                                "Channel state executor is interrupted while waiting for a request (continue waiting)",
+                                e);
+                    } else {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
         }
     }
 
+    private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException {
+        ChannelStateWriteRequest request;
+        while (true) {

Review Comment:
   Thanks for your review, updated.
   
   I didn't squash commits, and add a new fixup commit, it should be easy to review. And I can squash them and rebase master after you think it's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082043383


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -107,49 +119,78 @@ void run() {
             try {
                 closeAll(
                         this::cleanupRequests,
-                        () ->
-                                dispatcher.fail(
-                                        thrown == null ? new CancellationException() : thrown));
+                        () -> {
+                            Throwable cause;
+                            synchronized (lock) {
+                                cause = thrown == null ? new CancellationException() : thrown;
+                            }
+                            dispatcher.fail(cause);
+                        });
             } catch (Exception e) {
-                //noinspection NonAtomicOperationOnVolatileField
-                thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                synchronized (lock) {
+                    //noinspection NonAtomicOperationOnVolatileField
+                    thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                }
             }
             FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
         }
         LOG.debug("loop terminated");
     }
 
     private void loop() throws Exception {
-        while (!wasClosed) {
+        while (true) {
             try {
-                ChannelStateWriteRequest request = deque.take();
-                // The executor will end the registration, when the start request comes.
-                // Because the checkpoint can be started after all tasks are initiated.
-                if (request instanceof CheckpointStartRequest && isRegistering()) {
-                    checkState(
-                            isRegistering.compareAndSet(true, false),
-                            "Transition isRegistering failed.");
+                ChannelStateWriteRequest request;
+                boolean completeRegister = false;
+                synchronized (lock) {
+                    if (wasClosed) {
+                        return;
+                    }
+                    request = waitAndTakeUnsafe();
+                    // The executor will end the registration, when the start request comes.
+                    // Because the checkpoint can be started after all tasks are initiated.
+                    if (request instanceof CheckpointStartRequest) {
+                        completeRegister = completeRegister();
+                    }
+                }
+                if (completeRegister) {
                     onRegistered.accept(this);
                 }
                 dispatcher.dispatch(request);
             } catch (InterruptedException e) {
-                if (!wasClosed) {
-                    LOG.debug(
-                            "Channel state executor is interrupted while waiting for a request (continue waiting)",
-                            e);
-                } else {
-                    Thread.currentThread().interrupt();
+                synchronized (lock) {
+                    if (!wasClosed) {
+                        LOG.debug(
+                                "Channel state executor is interrupted while waiting for a request (continue waiting)",
+                                e);
+                    } else {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
         }
     }
 
+    private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException {
+        ChannelStateWriteRequest request;
+        while (true) {

Review Comment:
   Thanks for your review, updated.
   
   I didn't squash commits, and add a new fixup commit, it should be easy to review. And I can squash them after you think it's ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1052295272


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -18,280 +18,265 @@
 package org.apache.flink.runtime.checkpoint.channel;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
-import org.apache.flink.runtime.state.AbstractChannelStateHandle;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.AbstractChannelStateHandle.StateContentMetaInfo;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.InputChannelStateHandle;
-import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
+import java.util.Objects;
+import java.util.Set;
 
-import static java.util.Collections.emptyList;
-import static java.util.Collections.singletonList;
-import static java.util.UUID.randomUUID;
+import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Writes channel state for a specific checkpoint-subtask-attempt triple. */
+/** Writes channel state for multiple subtasks of the same checkpoint. */
 @NotThreadSafe
 class ChannelStateCheckpointWriter {
     private static final Logger LOG = LoggerFactory.getLogger(ChannelStateCheckpointWriter.class);
 
     private final DataOutputStream dataStream;
     private final CheckpointStateOutputStream checkpointStream;
-    private final ChannelStateWriteResult result;
-    private final Map<InputChannelInfo, StateContentMetaInfo> inputChannelOffsets = new HashMap<>();
-    private final Map<ResultSubpartitionInfo, StateContentMetaInfo> resultSubpartitionOffsets =
-            new HashMap<>();
+
+    /**
+     * Indicates whether the current checkpoints of all subtasks have exception. If it's not null,
+     * the checkpoint will fail.
+     */
+    private Throwable throwable;
+
     private final ChannelStateSerializer serializer;
     private final long checkpointId;
-    private boolean allInputsReceived = false;
-    private boolean allOutputsReceived = false;
     private final RunnableWithException onComplete;
-    private final int subtaskIndex;
-    private final String taskName;
+
+    // Subtasks that have not yet register writer result.
+    private final Set<SubtaskID> waitedSubtasks;
+
+    private final Map<SubtaskID, ChannelStatePendingResult> pendingResults = new HashMap<>();
 
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
-            CheckpointStartRequest startCheckpointItem,
+            Set<SubtaskID> subtasks,
+            long checkpointId,
             CheckpointStreamFactory streamFactory,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete)
             throws Exception {
         this(
-                taskName,
-                subtaskIndex,
-                startCheckpointItem.getCheckpointId(),
-                startCheckpointItem.getTargetResult(),
+                subtasks,
+                checkpointId,
                 streamFactory.createCheckpointStateOutputStream(EXCLUSIVE),
                 serializer,
                 onComplete);
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             CheckpointStateOutputStream stream,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete) {
-        this(
-                taskName,
-                subtaskIndex,
-                checkpointId,
-                result,
-                serializer,
-                onComplete,
-                stream,
-                new DataOutputStream(stream));
+        this(subtasks, checkpointId, serializer, onComplete, stream, new DataOutputStream(stream));
     }
 
     @VisibleForTesting
     ChannelStateCheckpointWriter(
-            String taskName,
-            int subtaskIndex,
+            Set<SubtaskID> subtasks,
             long checkpointId,
-            ChannelStateWriteResult result,
             ChannelStateSerializer serializer,
             RunnableWithException onComplete,
             CheckpointStateOutputStream checkpointStateOutputStream,
             DataOutputStream dataStream) {
-        this.taskName = taskName;
-        this.subtaskIndex = subtaskIndex;
+        checkArgument(!subtasks.isEmpty(), "The subtasks cannot be empty.");
+        this.waitedSubtasks = new HashSet<>(subtasks);
         this.checkpointId = checkpointId;
-        this.result = checkNotNull(result);
         this.checkpointStream = checkNotNull(checkpointStateOutputStream);
         this.serializer = checkNotNull(serializer);
         this.dataStream = checkNotNull(dataStream);
         this.onComplete = checkNotNull(onComplete);
         runWithChecks(() -> serializer.writeHeader(dataStream));
     }
 
-    void writeInput(InputChannelInfo info, Buffer buffer) {
-        write(
-                inputChannelOffsets,
-                info,
-                buffer,
-                !allInputsReceived,
-                "ChannelStateCheckpointWriter#writeInput");
+    void registerSubtaskResult(
+            SubtaskID subtaskID, ChannelStateWriter.ChannelStateWriteResult result) {
+        // The writer shouldn't register any subtask after writer has exception or is done,
+        checkState(!isDone(), "The write is done.");
+        Preconditions.checkState(
+                !pendingResults.containsKey(subtaskID),
+                "The subtask %s has already been register before.",
+                subtaskID);
+        waitedSubtasks.remove(subtaskID);
+
+        ChannelStatePendingResult pendingResult =
+                new ChannelStatePendingResult(
+                        subtaskID.getSubtaskIndex(), checkpointId, result, serializer);
+        pendingResults.put(subtaskID, pendingResult);
     }
 
-    void writeOutput(ResultSubpartitionInfo info, Buffer buffer) {
-        write(
-                resultSubpartitionOffsets,
-                info,
-                buffer,
-                !allOutputsReceived,
-                "ChannelStateCheckpointWriter#writeOutput");
+    void releaseSubtask(SubtaskID subtaskID) throws Exception {
+        if (waitedSubtasks.remove(subtaskID)) {
+            tryFinishResult();

Review Comment:
   👍 Could you maybe explain this in a java doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1281929633

   Hi @pnowojski , please help take a look in your free time, thanks a lot.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1033701473


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java:
##########
@@ -119,15 +135,19 @@ public void testCleanup() throws IOException {
     public void testIgnoresInterruptsWhileRunning() throws Exception {
         TestRequestDispatcher requestProcessor = new TestRequestDispatcher();
         LinkedBlockingDeque<ChannelStateWriteRequest> deque = new LinkedBlockingDeque<>();
-        try (ChannelStateWriteRequestExecutorImpl worker =
-                new ChannelStateWriteRequestExecutorImpl(TASK_NAME, requestProcessor, deque)) {
+        ChannelStateWriteRequestExecutorImpl worker =
+                new ChannelStateWriteRequestExecutorImpl(requestProcessor, deque, JOB_ID, 5);

Review Comment:
   Hi @pnowojski , I'm not sure should these `channel state` classes be moved to the `flink-streaming-java` module?
   
   All checkpoint-related configurations are defined in the `flink-streaming-java` module, and these `channel state` classes are defined in the `flink-runtime` module. 
   
   These classes didn't read any configuration before, so they are well. However, It's hard to read some configurations during unit test due to `flink-runtime` doesn't depend on the `flink-streaming-java` module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082041845


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   Sorry, after address your next [comment](https://github.com/apache/flink/pull/20151#discussion_r1081119337), I found we can add the `lock.notifyAll()` during close.
   
   When the executor is closed, the while loop inside of `waitAndTakeUnsafe` will be finished, then `waitAndTakeUnsafe` will return `null`, and the `executor thread` can be finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1048154979


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions {
                                                     "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
                                                     "the important considerations"))
                                     .build());
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of tasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each task will create a new channel state file when this is configured to 1.");

Review Comment:
   I don't see that this has been actually fixed? Note, most likely also the config option should be renamed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1048276628


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions {
                                                     "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
                                                     "the important considerations"))
                                     .build());
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of tasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each task will create a new channel state file when this is configured to 1.");

Review Comment:
   Sorry, I just changed the description, I will change the config key later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1286649770

   > As I understand, you mean that multiple ChannelStateWriterImpl share the same
   > ChannelStateWriteRequestExecutorImpl. When channel-state.number-of-tasks-share-file=5, each thread is responsible
   > for writing the channel state file for 5 subtasks, right? Since the file is written in a single thread, there is no need to consider thread safety issues.
   
   Yes, that's what I had in mind.
   
   > Your proposal should be clearer. I will try to refactor the code according to your proposal. Thanks again~
   
   Great! But please keep in mind that I haven't thought it fully through and I haven't tried to implement it myself, so if you encounter some obstacles, feel free to reach out to me before going too deep!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082041845


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   Sorry, after address your next [comment](https://github.com/apache/flink/pull/20151#discussion_r1081119337), I found we can add the `lock.notifyAll()` here.
   
   When the executor is closed, the while loop inside of `waitAndTakeUnsafe` will be finished, then `waitAndTakeUnsafe` will return `null`, and the `executor thread` can be finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1398086486

   👍 Thanks, it looks good. Once you squash fixup commits I will do the last pass and hopefully merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1049675221


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -315,18 +325,73 @@ public void fail(Throwable e) {
         }
     }
 
-    private interface HandleFactory<I, H extends AbstractChannelStateHandle<I>> {
-        H create(
-                int subtaskIndex,
-                I info,
-                StreamStateHandle underlying,
-                List<Long> offsets,
-                long size);
+    @Nonnull
+    private ChannelStatePendingResult getChannelStatePendingResult(
+            JobID jobID, JobVertexID jobVertexID, int subtaskIndex) {
+        SubtaskID subtaskID = SubtaskID.of(jobID, jobVertexID, subtaskIndex);
+        ChannelStatePendingResult pendingResult = pendingResults.get(subtaskID);
+        checkNotNull(pendingResult, "The subtask[%s] is not registered yet", subtaskID);
+        return pendingResult;
+    }
+}
+
+/** A identification for subtask. */
+class SubtaskID {
+
+    private final JobID jobID;

Review Comment:
   > All of the write requests should be coming from the same job.
   
   Yes, you're right. I think it's not necessary. I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1356266198

   > Thanks for your patience @1996fanrui , I've looked through most of the production code and I've left some comments. But I think I haven't spotted anything major.
   
   Hi @pnowojski , thanks a lot for your review, I have updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1356316350

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1049831635


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -152,14 +200,23 @@ private void failAndClearWriter(Throwable e) {
         writer = null;
     }
 
+    private void failAndClearWriter(
+            JobID jobID, JobVertexID jobVertexID, int subtaskIndex, Throwable throwable) {
+        if (writer == null) {
+            return;
+        }
+        writer.fail(jobID, jobVertexID, subtaskIndex, throwable);
+        writer = null;
+    }

Review Comment:
   >  One is failing the pendingResult the other is not? 🤔
   
   No, all of them will fail all `pendingResults`. I will add some comments for the method.
   
   ```
       /**
        * The throwable is just used for special subtask, other subtasks should fail by
        * {@link CHANNEL_STATE_SHARED_STREAM_EXCEPTION}
        */
       public void fail(JobVertexID jobVertexID, int subtaskIndex, Throwable throwable) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1048554412


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions {
                                                     "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
                                                     "the important considerations"))
                                     .build());
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of tasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each task will create a new channel state file when this is configured to 1.");

Review Comment:
   Updated.
   
   BTW, I squashed commits this time due to rebased the master branch and fixed some conflicts. I will use the separate commits when addressing comments in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082757960


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Flag to enable approximate local recovery.");
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of subtasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each subtask will create a new channel state file when this is configured to 1.");

Review Comment:
   Sounds good to me.
   
   Thanks for the suggestion, I will update it asap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1081118070


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -51,27 +66,40 @@ class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestEx
     private final Thread thread;
     private volatile Exception thrown = null;
     private volatile boolean wasClosed = false;
-    private final String taskName;
+
+    private final Map<SubtaskID, Queue<ChannelStateWriteRequest>> unreadyQueues =
+            new ConcurrentHashMap<>();
+
+    private final JobID jobID;
+    private final Set<SubtaskID> subtasks;
+    private final AtomicBoolean isRegistering = new AtomicBoolean(true);
+    private final int numberOfSubtasksShareFile;

Review Comment:
   Hmmm, I'm not sure. Maybe it's sufficient to have just the interrupt. On the other hand, does it hurt if we add the `lock.notifyAll()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski merged pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by "pnowojski (via GitHub)" <gi...@apache.org>.
pnowojski merged PR #20151:
URL: https://github.com/apache/flink/pull/20151


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1000549678


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequest.java:
##########
@@ -205,9 +207,17 @@ public CheckpointStorageLocationReference getLocationReference() {
         return locationReference;
     }
 
+    public void registerCancelCallback(Consumer<Throwable> cancelCallback) {
+        this.cancelCallback = cancelCallback;
+    }
+
     @Override
     public void cancel(Throwable cause) {
-        targetResult.fail(cause);
+        if (cancelCallback == null) {
+            targetResult.fail(cause);
+            return;
+        }
+        cancelCallback.accept(cause);

Review Comment:
   why we are not failing the `targetResult` if the `cancelCallback` is set? This seems strange.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -252,4 +252,13 @@ public class ExecutionCheckpointingOptions {
                                                     "{{.Site.BaseURL}}{{.Site.LanguagePrefix}}/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta",
                                                     "the important considerations"))
                                     .build());
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_TASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-tasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of tasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each task will create a new channel state file when this is configured to 1.");

Review Comment:
   `task` -> `subtask`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1329031528

   Hi @pnowojski , I have updated the PR according to your great suggestion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1049806879


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java:
##########
@@ -90,33 +103,62 @@ public void dispatch(ChannelStateWriteRequest request) throws Exception {
     }
 
     private void dispatchInternal(ChannelStateWriteRequest request) throws Exception {
+        if (request instanceof SubtaskRegisterRequest) {
+            SubtaskRegisterRequest req = (SubtaskRegisterRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), req.getSubtaskIndex());
+            subtasks.add(subtaskID);
+            return;
+        } else if (request instanceof SubtaskReleaseRequest) {
+            SubtaskReleaseRequest req = (SubtaskReleaseRequest) request;
+            SubtaskID subtaskID =
+                    SubtaskID.of(req.getJobID(), req.getJobVertexID(), req.getSubtaskIndex());
+            subtasks.remove(subtaskID);
+            if (writer == null) {
+                return;
+            }
+            writer.releaseSubtask(subtaskID);
+            return;
+        }
         if (isAbortedCheckpoint(request.getCheckpointId())) {
-            if (request.getCheckpointId() == maxAbortedCheckpointId) {
+            if (request.getCheckpointId() != maxAbortedCheckpointId) {
+                request.cancel(new CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+                return;
+            }
+
+            SubtaskID requestSubtask =
+                    SubtaskID.of(
+                            request.getJobID(),
+                            request.getJobVertexID(),
+                            request.getSubtaskIndex());
+            if (requestSubtask.equals(abortedSubtaskID)) {
                 request.cancel(abortedCause);
             } else {
-                request.cancel(new CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+                request.cancel(
+                        new CheckpointException(
+                                CHANNEL_STATE_SHARED_STREAM_EXCEPTION, abortedCause));
             }
             return;
         }
 
         if (request instanceof CheckpointStartRequest) {
             checkState(
-                    request.getCheckpointId() > ongoingCheckpointId,
+                    request.getCheckpointId() >= ongoingCheckpointId,
                     String.format(
                             "Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.",
                             ongoingCheckpointId, request));
-            failAndClearWriter(
-                    new IllegalStateException(
-                            String.format(
-                                    "Task[name=%s, subtaskIndex=%s] has uncompleted channelState writer of checkpointId=%s, "
-                                            + "but it received a new checkpoint start request of checkpointId=%s, it maybe "
-                                            + "a bug due to currently not supported concurrent unaligned checkpoint.",
-                                    taskName,
-                                    subtaskIndex,
-                                    ongoingCheckpointId,
-                                    request.getCheckpointId())));
-            this.writer = buildWriter((CheckpointStartRequest) request);
-            this.ongoingCheckpointId = request.getCheckpointId();
+            if (request.getCheckpointId() > ongoingCheckpointId) {
+                // Clear the previous writer.
+                failAndClearWriter(new CheckpointException(CHECKPOINT_DECLINED_SUBSUMED));
+            }
+            CheckpointStartRequest req = (CheckpointStartRequest) request;
+            if (writer == null) {

Review Comment:
   > This can not be checkState(writer == null) because single dispatcher will handle 5 CheckpointStartRequests from 5 subtasks (assuming 5 subtasks are configured to share the same file?).
   
   Yes, totally right. I will add comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] 1996fanrui commented on pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on PR #20151:
URL: https://github.com/apache/flink/pull/20151#issuecomment-1285706618

   > With a shared `ChannelStateWriteRequestExecutorImpl` there would be no need for the second level of the synchornisation. Making the code more efficient (due to fewer synchronisation and also keeping the one writing thread per file) and also simpler with fewer opportunities for race conditions/deadlocks.
   > 
   > WDYT? Am I missing something?
   
   Hi @pnowojski , thanks for your hard review. 
   
   Currently, there is one `ChannelStateWriterImpl` per subtask and one `ChannelStateWriteRequestExecutorImpl` per `ChannelStateWriterImpl`. That is: each subtask has one thread to write the channel state file.
   
   As I understand, you mean that multiple `ChannelStateWriterImpl` share the same `ChannelStateWriteRequestExecutorImpl`. When `channel-state.number-of-tasks-share-file=5`, each thread is responsible for writing the channel state file for 5 subtasks, right? Since the file is written in a single thread, there is no need to consider thread safety issues. 
   
   Your proposal should be clearer. I will try to refactor the code according to your proposal. Thanks again~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dawidwys commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
dawidwys commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1082743417


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java:
##########
@@ -280,4 +280,13 @@ public class ExecutionCheckpointingOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription("Flag to enable approximate local recovery.");
+
+    public static final ConfigOption<Integer> CHANNEL_STATE_NUMBER_OF_SUBTASKS_SHARE_FILE =
+            ConfigOptions.key("execution.checkpointing.channel-state.number-of-subtasks-share-file")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "Defines the maximum number of subtasks that share the same channel state file. "
+                                    + "It can reduce the number of small files when enable unaligned checkpoint. "
+                                    + "Each subtask will create a new channel state file when this is configured to 1.");

Review Comment:
   Makes sense to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1081119337


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.java:
##########
@@ -107,49 +119,78 @@ void run() {
             try {
                 closeAll(
                         this::cleanupRequests,
-                        () ->
-                                dispatcher.fail(
-                                        thrown == null ? new CancellationException() : thrown));
+                        () -> {
+                            Throwable cause;
+                            synchronized (lock) {
+                                cause = thrown == null ? new CancellationException() : thrown;
+                            }
+                            dispatcher.fail(cause);
+                        });
             } catch (Exception e) {
-                //noinspection NonAtomicOperationOnVolatileField
-                thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                synchronized (lock) {
+                    //noinspection NonAtomicOperationOnVolatileField
+                    thrown = ExceptionUtils.firstOrSuppressed(e, thrown);
+                }
             }
             FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
         }
         LOG.debug("loop terminated");
     }
 
     private void loop() throws Exception {
-        while (!wasClosed) {
+        while (true) {
             try {
-                ChannelStateWriteRequest request = deque.take();
-                // The executor will end the registration, when the start request comes.
-                // Because the checkpoint can be started after all tasks are initiated.
-                if (request instanceof CheckpointStartRequest && isRegistering()) {
-                    checkState(
-                            isRegistering.compareAndSet(true, false),
-                            "Transition isRegistering failed.");
+                ChannelStateWriteRequest request;
+                boolean completeRegister = false;
+                synchronized (lock) {
+                    if (wasClosed) {
+                        return;
+                    }
+                    request = waitAndTakeUnsafe();
+                    // The executor will end the registration, when the start request comes.
+                    // Because the checkpoint can be started after all tasks are initiated.
+                    if (request instanceof CheckpointStartRequest) {
+                        completeRegister = completeRegister();
+                    }
+                }
+                if (completeRegister) {
                     onRegistered.accept(this);
                 }
                 dispatcher.dispatch(request);
             } catch (InterruptedException e) {
-                if (!wasClosed) {
-                    LOG.debug(
-                            "Channel state executor is interrupted while waiting for a request (continue waiting)",
-                            e);
-                } else {
-                    Thread.currentThread().interrupt();
+                synchronized (lock) {
+                    if (!wasClosed) {
+                        LOG.debug(
+                                "Channel state executor is interrupted while waiting for a request (continue waiting)",
+                                e);
+                    } else {
+                        Thread.currentThread().interrupt();
+                    }
                 }
             }
         }
     }
 
+    private ChannelStateWriteRequest waitAndTakeUnsafe() throws InterruptedException {
+        ChannelStateWriteRequest request;
+        while (true) {

Review Comment:
   shouldn't this be `while(!wasClosed)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] pnowojski commented on a diff in pull request #20151: [FLINK-26803][checkpoint] Merging channel state files

Posted by GitBox <gi...@apache.org>.
pnowojski commented on code in PR #20151:
URL: https://github.com/apache/flink/pull/20151#discussion_r1052319366


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriter.java:
##########
@@ -286,27 +277,32 @@ private void runWithChecks(RunnableWithException r) {
         }
     }
 
-    public void fail(JobID jobID, JobVertexID jobVertexID, int subtaskIndex, Throwable e) {
+    /**
+     * The throwable is just used for special subtask, other subtasks should fail by {@link
+     * CHANNEL_STATE_SHARED_STREAM_EXCEPTION}.

Review Comment:
   nit:
   
   > The throwable is just used for specific subtask that triggered the failure. Other subtasks should fail by {@link CHANNEL_STATE_SHARED_STREAM_EXCEPTION}
   
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org