You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/25 22:17:48 UTC

[GitHub] [flink] JingGe commented on a change in pull request #18428: [FLINK-25575] Add Sink V2 operators and translation

JingGe commented on a change in pull request #18428:
URL: https://github.com/apache/flink/pull/18428#discussion_r791758548



##########
File path: flink-architecture-tests/violations/5b9eed8a-5fb6-4373-98ac-3be2a71941b8
##########
@@ -93,7 +93,6 @@ org.apache.flink.connector.file.src.util.Pool.recycler(): Returned leaf type org
 org.apache.flink.connector.file.src.util.Utils.forEachRemaining(org.apache.flink.connector.file.src.reader.BulkFormat$Reader, java.util.function.Consumer): Argument leaf type org.apache.flink.connector.file.src.reader.BulkFormat$Reader does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions.builder(): Returned leaf type org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions$JDBCExactlyOnceOptionsBuilder does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
 org.apache.flink.connector.jdbc.JdbcExecutionOptions.builder(): Returned leaf type org.apache.flink.connector.jdbc.JdbcExecutionOptions$Builder does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated
-org.apache.flink.connector.jdbc.JdbcSink.exactlyOnceSink(java.lang.String, org.apache.flink.connector.jdbc.JdbcStatementBuilder, org.apache.flink.connector.jdbc.JdbcExecutionOptions, org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions, org.apache.flink.util.function.SerializableSupplier): Argument leaf type org.apache.flink.util.function.SerializableSupplier does not satisfy: reside outside of package 'org.apache.flink..' or annotated with @Public or annotated with @PublicEvolving or annotated with @Deprecated

Review comment:
       I think these should not be mixed up with this PR

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/Committables.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Internal wrapper to handle the committing of committables.
+ *
+ * @param <CommT> type of the committable
+ */
+@Internal
+public interface Committables<CommT> {
+    /** Returns a summary of the current commit progress. */
+    CommittableSummary<CommT> getSummary();
+
+    /**
+     * Commits all due committables.
+     *
+     * @param fullyReceived only commit committables if all committables of this checkpoint for a

Review comment:
       This description seems to be the implementation logic. Instead of it, we should describe the purpose of this parameter at the interface level

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {

Review comment:
       It is difficult to understand the different between `getNumPending` and `getPendingRequests`, since both of them are dealing with committables, one is via `CommittableSummary` and the other is via `CommitRequestImpl`. I didn't find a feasible name for it, therefore I would suggest adding java doc to make it clear.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;

Review comment:
       ```suggestion
       private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittablesMap;
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables;
+    private final CommittableCollector<CommT> collector;
+    private final long checkpointId;
+
+    CheckpointCommittablesImpl(CommittableCollector<CommT> collector, long checkpointId) {
+        this.collector = checkNotNull(collector);
+        this.checkpointId = checkpointId;
+        this.subtasksCommittables = new HashMap<>();
+    }
+
+    CheckpointCommittablesImpl(
+            Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables, long checkpointId) {

Review comment:
       checkpointId could be get from `SubtaskCommittables`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables;
+    private final CommittableCollector<CommT> collector;
+    private final long checkpointId;
+
+    CheckpointCommittablesImpl(CommittableCollector<CommT> collector, long checkpointId) {
+        this.collector = checkNotNull(collector);
+        this.checkpointId = checkpointId;
+        this.subtasksCommittables = new HashMap<>();
+    }
+
+    CheckpointCommittablesImpl(
+            Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables, long checkpointId) {
+        this.subtasksCommittables = checkNotNull(subtasksCommittables);
+        // dummy collector, never used
+        this.collector = new CommittableCollector<>(0, 1);
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    Collection<SubtaskCommittables<CommT>> getSubtasks() {
+        return subtasksCommittables.values();
+    }
+
+    void addSummary(CommittableSummary<CommT> summary) {

Review comment:
       should actually be upsertSummary(...)

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */
+    CommittableCollector(Map<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittables<CommT> subtask =
+                committableCollector
+                        .getCheckpointCommittables(summary)
+                        .getSubtaskCommittables(summary.getSubtaskId());
+        r.forEach(subtask::add);
+        return committableCollector;
+    }
+
+    /**
+     * Adds a {@link CommittableMessage} to the collector to hold it until emission.
+     *
+     * @param message either {@link CommittableSummary} or {@link CommittableWithLineage}
+     */
+    public void addMessage(CommittableMessage<CommT> message) {

Review comment:
       Nit

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittables.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * This interface adds checkpoint meta information to the committable.
+ *
+ * @param <CommT> type of the committable
+ */
+@Internal
+public interface CheckpointCommittables<CommT> extends Committables<CommT> {

Review comment:
       From OOP point of view, this interface introduced the checkpoint, which means `Committables` must be at a higher level above it and has nothing to do with the checkpoint. But the fact is that `Committables` has already been dealing with `CommittableSummary` and `CommittableWithLineage` that can all provide checkpointId. If it was intended to distinguish between them, more info will be required to describe when to use `Committables` and when to use `CheckpointCommittables`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {

Review comment:
       java doc

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */

Review comment:
       Java doc is required to describe the deserialization why `subtaskId` and `numberOfSubtasks` are not required for serde. Add reference to the class where it is handled by custom serialization/deserialization, i.e. `CommittableCollectorSerializer`

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {
+        return summary.getNumberOfCommittables() - (numDrained + numFailed);
+    }
+
+    int getNumFailed() {
+        return numFailed;
+    }
+
+    boolean isFinished() {
+        return getNumPending() == 0;
+    }
+
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
+        return requests.stream().filter(c -> !c.isFinished());
+    }
+
+    List<CommittableWithLineage<CommT>> drainCommitted() {
+        List<CommittableWithLineage<CommT>> committed = new ArrayList<>(requests.size());
+        for (Iterator<CommitRequestImpl<CommT>> iterator = requests.iterator();
+                iterator.hasNext(); ) {
+            CommitRequestImpl<CommT> request = iterator.next();
+            if (!request.isFinished()) {
+                continue;
+            }
+            if (request.getState() == CommitRequestState.FAILED) {
+                numFailed += 1;

Review comment:
       iterator.remove()? otherwise, failed commitables will be counted twice

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/Committables.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Internal wrapper to handle the committing of committables.

Review comment:
       It seems that there are now two meanings of the word "comittables", since the new interface `Committables` has been created. Afaik, "commitables" has been used as a conceptual name previously very often. There is no corresponding domain entity developed for it and just let it open for users to define via the generic type `CommT`.
   
   Since all methods in this interface are working with subclasses of `ComittableMessage`, this interface is actually responsible for managing all commitables and all kinds of `ComittableMessage`, i.e. `ComittableSummary` and `ComittableWithLineage`, which turns out that calling this interface as `ComittablesManager` would be another naming option that is easier to understand the concept behind.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;

Review comment:
       add single line comment for the key -> checkpointId

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables;

Review comment:
       ```suggestion
       private final Map<Integer, SubtaskCommittables<CommT>> subtasksCommittablesMap;
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {
+        return summary.getNumberOfCommittables() - (numDrained + numFailed);
+    }
+
+    int getNumFailed() {
+        return numFailed;
+    }
+
+    boolean isFinished() {
+        return getNumPending() == 0;
+    }
+
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
+        return requests.stream().filter(c -> !c.isFinished());
+    }
+
+    List<CommittableWithLineage<CommT>> drainCommitted() {

Review comment:
       I think java doc is required for this method. Will this method be called multiple times? How to make the numbers being synched with the `summary`, i.e. `hasReceivedAll()`? Will the failed request be retried?
   
   There many numbers defined in this class that need more detailed description about the relationship between them:
   1. total number of commitables in `summary`
   2. total number of `CommitRequestImpl` in `request`
      2.1 number of unfinished request
      2.2 number of failed request
      2.3 number of request that will be drained and removed
   3. numFailed, accumulated if this method would be called multiple time.
   4. numDrained, accumulated if this method would be called multiple time.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;
+    private final int subtaskId;

Review comment:
       what is the purpose of using `subtaskId`? 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {

Review comment:
       As additional info to other comments: this method opens the door to let the total numbers of request be greater than the total number of commitables in the `summary`.

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/SubtaskCommittables.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class SubtaskCommittables<CommT> {
+    private CommittableSummary<CommT> summary;
+    private final Deque<CommitRequestImpl<CommT>> requests;
+    private int numDrained;
+    private int numFailed;
+
+    SubtaskCommittables(CommittableSummary<CommT> summary) {
+        this(summary, Collections.emptyList(), 0, 0);
+    }
+
+    SubtaskCommittables(
+            CommittableSummary<CommT> summary,
+            Collection<CommitRequestImpl<CommT>> requests,
+            int numDrained,
+            int numFailed) {
+        this.summary = checkNotNull(summary);
+        this.requests = new ArrayDeque<>(checkNotNull(requests));
+        this.numDrained = numDrained;
+        this.numFailed = numFailed;
+    }
+
+    void add(CommittableWithLineage<CommT> committable) {
+        add(committable.getCommittable());
+    }
+
+    void add(CommT committable) {
+        requests.add(new CommitRequestImpl<>(committable));
+    }
+
+    boolean hasReceivedAll() {
+        return getNumCommittables() == summary.getNumberOfCommittables();
+    }
+
+    int getNumCommittables() {
+        return requests.size() + numDrained + numFailed;
+    }
+
+    int getNumPending() {
+        return summary.getNumberOfCommittables() - (numDrained + numFailed);
+    }
+
+    int getNumFailed() {
+        return numFailed;
+    }
+
+    boolean isFinished() {
+        return getNumPending() == 0;
+    }
+
+    Stream<CommitRequestImpl<CommT>> getPendingRequests() {
+        return requests.stream().filter(c -> !c.isFinished());
+    }
+
+    List<CommittableWithLineage<CommT>> drainCommitted() {
+        List<CommittableWithLineage<CommT>> committed = new ArrayList<>(requests.size());
+        for (Iterator<CommitRequestImpl<CommT>> iterator = requests.iterator();
+                iterator.hasNext(); ) {
+            CommitRequestImpl<CommT> request = iterator.next();
+            if (!request.isFinished()) {
+                continue;
+            }
+            if (request.getState() == CommitRequestState.FAILED) {
+                numFailed += 1;
+                continue;
+            } else {
+                committed.add(
+                        new CommittableWithLineage<>(
+                                request.getCommittable(),
+                                summary.getCheckpointId().isPresent()
+                                        ? summary.getCheckpointId().getAsLong()
+                                        : null,
+                                summary.getSubtaskId()));
+            }
+            iterator.remove();
+        }
+
+        numDrained += committed.size();
+        return committed;
+    }
+
+    CommittableSummary<CommT> getSummary() {
+        return summary;
+    }
+
+    void setSummary(CommittableSummary<CommT> summary) {
+        this.summary = summary;
+    }
+
+    int getNumDrained() {
+        return numDrained;
+    }
+
+    Deque<CommitRequestImpl<CommT>> getRequests() {
+        return requests;
+    }
+
+    SubtaskCommittables<CommT> merge(SubtaskCommittables<CommT> other) {
+        CommittableSummary<CommT> otherSummary = other.getSummary();
+        checkArgument(otherSummary.getSubtaskId() == this.summary.getSubtaskId());
+        OptionalLong checkpointId = this.summary.getCheckpointId();
+        this.summary =
+                new CommittableSummary<>(
+                        this.summary.getSubtaskId(),
+                        this.summary.getNumberOfCommittables(),

Review comment:
       Should this be the `numberOfSubtasks`?  How does the merge work, if the other has a different numberOfSubtasks? Or should check that the numberOfSubtasks should be equal?

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables;
+    private final CommittableCollector<CommT> collector;
+    private final long checkpointId;
+
+    CheckpointCommittablesImpl(CommittableCollector<CommT> collector, long checkpointId) {
+        this.collector = checkNotNull(collector);
+        this.checkpointId = checkpointId;
+        this.subtasksCommittables = new HashMap<>();
+    }
+
+    CheckpointCommittablesImpl(
+            Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables, long checkpointId) {
+        this.subtasksCommittables = checkNotNull(subtasksCommittables);
+        // dummy collector, never used
+        this.collector = new CommittableCollector<>(0, 1);
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return checkpointId;
+    }
+
+    Collection<SubtaskCommittables<CommT>> getSubtasks() {

Review comment:
       It would be better to use a suitable name, since Subtask has its won domain entity meaning. 

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittablesImpl.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+class CheckpointCommittablesImpl<CommT> implements CheckpointCommittables<CommT> {
+    private final Map<Integer, SubtaskCommittables<CommT>> subtasksCommittables;

Review comment:
       // key -> subtaskId

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */
+    CommittableCollector(Map<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittables<CommT> subtask =
+                committableCollector
+                        .getCheckpointCommittables(summary)
+                        .getSubtaskCommittables(summary.getSubtaskId());
+        r.forEach(subtask::add);
+        return committableCollector;
+    }
+
+    /**
+     * Adds a {@link CommittableMessage} to the collector to hold it until emission.
+     *
+     * @param message either {@link CommittableSummary} or {@link CommittableWithLineage}
+     */
+    public void addMessage(CommittableMessage<CommT> message) {

Review comment:
       ```suggestion
       public void addCommittableMessage(CommittableMessage<CommT> message) {
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollector.java
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.connector.sink2.Sink.InitContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible to book-keep the committing progress across checkpoints and subtasks.
+ * It handles the emission of committables and the {@link CommittableSummary}.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommittableCollector<CommT> {
+    private static final long EOI = Long.MAX_VALUE;
+    private final NavigableMap<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables;
+    private final int subtaskId;
+    private final int numberOfSubtasks;
+
+    CommittableCollector(int subtaskId, int numberOfSubtasks) {
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+        this.checkpointCommittables = new TreeMap<>();
+    }
+
+    /** For deserialization. */
+    CommittableCollector(Map<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables) {
+        this(checkpointCommittables, 0, 1);
+    }
+
+    /** For deep-copy. */
+    CommittableCollector(
+            Map<Long, CheckpointCommittablesImpl<CommT>> checkpointCommittables,
+            int subtaskId,
+            int numberOfSubtasks) {
+        this.checkpointCommittables = new TreeMap<>(checkNotNull(checkpointCommittables));
+        this.subtaskId = subtaskId;
+        this.numberOfSubtasks = numberOfSubtasks;
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} based on the current runtime information. This method
+     * should be used for to instantiate a collector for all Sink V2.
+     *
+     * @param context holding runtime of information
+     * @param <CommT> type of the committable
+     * @return {@link CommittableCollector}
+     */
+    public static <CommT> CommittableCollector<CommT> of(RuntimeContext context) {
+        return new CommittableCollector<>(
+                context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
+    }
+
+    /**
+     * Creates a {@link CommittableCollector} for a list of committables. This method is mainly used
+     * to create a collector from the state of Sink V1.
+     *
+     * @param r list of committables
+     * @param <CommT> type of committables
+     * @return {@link CommittableCollector}
+     */
+    static <CommT> CommittableCollector<CommT> ofLegacy(List<CommT> r) {
+        CommittableCollector<CommT> committableCollector = new CommittableCollector<>(0, 1);
+        // add a checkpoint with the lowest checkpoint id, this will be merged into the next
+        // checkpoint data, subtask id is arbitrary
+        CommittableSummary<CommT> summary =
+                new CommittableSummary<>(
+                        0, 1, InitContext.INITIAL_CHECKPOINT_ID, r.size(), r.size(), 0);
+        committableCollector.addSummary(summary);
+        SubtaskCommittables<CommT> subtask =
+                committableCollector
+                        .getCheckpointCommittables(summary)
+                        .getSubtaskCommittables(summary.getSubtaskId());
+        r.forEach(subtask::add);
+        return committableCollector;
+    }
+
+    /**
+     * Adds a {@link CommittableMessage} to the collector to hold it until emission.
+     *
+     * @param message either {@link CommittableSummary} or {@link CommittableWithLineage}
+     */
+    public void addMessage(CommittableMessage<CommT> message) {
+        if (message instanceof CommittableSummary) {
+            addSummary((CommittableSummary<CommT>) message);
+        } else if (message instanceof CommittableWithLineage) {
+            addCommittable((CommittableWithLineage<CommT>) message);
+        }
+    }
+
+    /**
+     * Returns all {@link CheckpointCommittables} until the requested checkpoint id.
+     *
+     * @param checkpointId counter
+     * @return collection of {@link CheckpointCommittables}
+     */
+    public Collection<? extends CheckpointCommittables<CommT>> getCheckpointCommittablesUpTo(
+            long checkpointId) {
+        // clean up fully committed previous checkpoints
+        // this wouldn't work with concurrent unaligned checkpoints
+        Collection<CheckpointCommittablesImpl<CommT>> checkpoints =
+                checkpointCommittables.headMap(checkpointId, true).values();
+        checkpoints.removeIf(CheckpointCommittablesImpl::isFinished);
+        return checkpoints;
+    }
+
+    /**
+     * Returns all {@link CheckpointCommittables} that are currently hold by the collector.
+     *
+     * @return collection of {@link CheckpointCommittables}
+     */
+    @Nullable
+    public Collection<? extends CheckpointCommittables<CommT>> getEndOfInputCommittables() {
+        return getCheckpointCommittablesUpTo(EOI);
+    }
+
+    /**
+     * Returns whether all {@link CheckpointCommittables} currently hold by the collector are either
+     * committed or failed.
+     *
+     * @return state of the {@link CheckpointCommittables}
+     */
+    public boolean isFinished() {
+        return checkpointCommittables.values().stream()
+                .allMatch(CheckpointCommittablesImpl::isFinished);
+    }
+
+    /**
+     * Merges all information from an external collector into this collector.
+     *
+     * <p>This method is important during recovery from existing state.
+     *
+     * @param cc other {@link CommittableCollector}
+     */
+    public void merge(CommittableCollector<CommT> cc) {
+        for (Entry<Long, CheckpointCommittablesImpl<CommT>> checkpointEntry :
+                cc.checkpointCommittables.entrySet()) {
+            checkpointCommittables.merge(
+                    checkpointEntry.getKey(),
+                    checkpointEntry.getValue(),
+                    CheckpointCommittablesImpl::merge);
+        }
+    }
+
+    /**
+     * Returns number of subtasks.
+     *
+     * @return number of subtasks
+     */
+    public int getNumberOfSubtasks() {
+        return numberOfSubtasks;
+    }
+
+    /**
+     * Returns subtask id.
+     *
+     * @return subtask id.
+     */
+    public int getSubtaskId() {
+        return subtaskId;
+    }
+
+    /**
+     * Returns a new committable collector that deep copies all internals.
+     *
+     * @return {@link CommittableCollector}
+     */
+    public CommittableCollector<CommT> copy() {
+        return new CommittableCollector<>(
+                checkpointCommittables.entrySet().stream()
+                        .map(e -> Tuple2.of(e.getKey(), e.getValue().copy()))
+                        .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1)),
+                subtaskId,
+                numberOfSubtasks);
+    }
+
+    Collection<CheckpointCommittablesImpl<CommT>> getCheckpointCommittables() {
+        return checkpointCommittables.values();
+    }
+
+    private void addSummary(CommittableSummary<CommT> summary) {
+        checkpointCommittables
+                .computeIfAbsent(
+                        summary.getCheckpointId().orElse(EOI),
+                        key ->
+                                new CheckpointCommittablesImpl<>(
+                                        this, summary.getCheckpointId().orElse(EOI)))
+                .addSummary(summary);
+    }
+
+    private void addCommittable(CommittableWithLineage<CommT> committable) {
+        getCheckpointCommittables(committable).addCommittable(committable);
+    }
+
+    private CheckpointCommittablesImpl<CommT> getCheckpointCommittables(
+            CommittableMessage<CommT> committable) {

Review comment:
       ```suggestion
               OptionalLong checkpointId) {
   ```

##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommitRequestImpl.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.sink.committables;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.Committer;
+
+/**
+ * Internal implementation to commit a specific committable and handle the response.
+ *
+ * @param <CommT> type of committable
+ */
+@Internal
+public class CommitRequestImpl<CommT> implements Committer.CommitRequest<CommT> {
+
+    private CommT committable;
+    private int numRetries;
+    private CommitRequestState state;
+
+    protected CommitRequestImpl(CommT committable) {
+        this.committable = committable;
+        state = CommitRequestState.RECEIVED;
+    }
+
+    protected CommitRequestImpl(CommT committable, int numRetries, CommitRequestState state) {
+        this.committable = committable;
+        this.numRetries = numRetries;
+        this.state = state;
+    }
+
+    boolean isFinished() {
+        return state.isFinalState();
+    }
+
+    CommitRequestState getState() {
+        return state;
+    }
+
+    @Override
+    public CommT getCommittable() {
+        return committable;
+    }
+
+    @Override
+    public int getNumberOfRetries() {
+        return numRetries;
+    }
+
+    @Override
+    public void signalFailedWithKnownReason(Throwable t) {
+        state = CommitRequestState.FAILED;
+        // add metric later

Review comment:
       ```suggestion
           // TODO(...) add metric later
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org